crash_gen.py 108.6 KB
Newer Older
S
Shuduo Sang 已提交
1
# -----!/usr/bin/python3.7
S
Steven Li 已提交
2 3 4 5 6 7 8 9 10 11 12 13
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-
S
Shuduo Sang 已提交
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
# For type hinting before definition, ref:
# https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel
from __future__ import annotations
import taos
import crash_gen
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.log import *
from queue import Queue, Empty
from typing import IO
from typing import Set
from typing import Dict
from typing import List
from requests.auth import HTTPBasicAuth
import textwrap
import datetime
import logging
import time
import random
import threading
import requests
import copy
import argparse
import getopt
39

S
Steven Li 已提交
40
import sys
41
import os
42 43
import io
import signal
44
import traceback
45 46 47 48 49 50 51

try:
    import psutil
except:
    print("Psutil module needed, please install: sudo pip3 install psutil")
    sys.exit(-1)

52 53 54 55
# Require Python 3
if sys.version_info[0] < 3:
    raise Exception("Must be using Python 3")

S
Steven Li 已提交
56

S
Shuduo Sang 已提交
57
# Global variables, tried to keep a small number.
58 59 60

# Command-line/Environment Configurations, will set a bit later
# ConfigNameSpace = argparse.Namespace
S
Shuduo Sang 已提交
61
gConfig = argparse.Namespace()  # Dummy value, will be replaced later
62
gSvcMgr = None # TODO: refactor this hack, use dep injection
63
logger = None
S
Steven Li 已提交
64

S
Shuduo Sang 已提交
65
def runThread(wt: WorkerThread):
66
    wt.run()
67

68 69
class CrashGenError(Exception):
    def __init__(self, msg=None, errno=None):
S
Shuduo Sang 已提交
70
        self.msg = msg
71
        self.errno = errno
S
Shuduo Sang 已提交
72

73 74 75
    def __str__(self):
        return self.msg

S
Shuduo Sang 已提交
76

S
Steven Li 已提交
77
class WorkerThread:
78
    def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator,
S
Shuduo Sang 已提交
79 80 81
                 # te: TaskExecutor,
                 ):  # note: main thread context!
        # self._curStep = -1
82
        self._pool = pool
S
Shuduo Sang 已提交
83 84
        self._tid = tid
        self._tc = tc  # type: ThreadCoordinator
S
Steven Li 已提交
85
        # self.threadIdent = threading.get_ident()
86 87
        self._thread = threading.Thread(target=runThread, args=(self,))
        self._stepGate = threading.Event()
S
Steven Li 已提交
88

89
        # Let us have a DB connection of our own
S
Shuduo Sang 已提交
90
        if (gConfig.per_thread_db_connection):  # type: ignore
91
            # print("connector_type = {}".format(gConfig.connector_type))
92 93 94 95 96 97 98 99 100 101 102
            if gConfig.connector_type == 'native':
                self._dbConn = DbConn.createNative() 
            elif gConfig.connector_type == 'rest':
                self._dbConn = DbConn.createRest() 
            elif gConfig.connector_type == 'mixed':
                if Dice.throw(2) == 0: # 1/2 chance
                    self._dbConn = DbConn.createNative() 
                else:
                    self._dbConn = DbConn.createRest() 
            else:
                raise RuntimeError("Unexpected connector type: {}".format(gConfig.connector_type))
103

S
Shuduo Sang 已提交
104
        self._dbInUse = False  # if "use db" was executed already
105

106
    def logDebug(self, msg):
S
Steven Li 已提交
107
        logger.debug("    TRD[{}] {}".format(self._tid, msg))
108 109

    def logInfo(self, msg):
S
Steven Li 已提交
110
        logger.info("    TRD[{}] {}".format(self._tid, msg))
111

112 113 114 115
    def dbInUse(self):
        return self._dbInUse

    def useDb(self):
S
Shuduo Sang 已提交
116
        if (not self._dbInUse):
117 118 119
            self.execSql("use db")
        self._dbInUse = True

120
    def getTaskExecutor(self):
S
Shuduo Sang 已提交
121
        return self._tc.getTaskExecutor()
122

S
Steven Li 已提交
123
    def start(self):
124
        self._thread.start()  # AFTER the thread is recorded
S
Steven Li 已提交
125

S
Shuduo Sang 已提交
126
    def run(self):
S
Steven Li 已提交
127
        # initialization after thread starts, in the thread context
128
        # self.isSleeping = False
129 130
        logger.info("Starting to run thread: {}".format(self._tid))

S
Shuduo Sang 已提交
131
        if (gConfig.per_thread_db_connection):  # type: ignore
132
            logger.debug("Worker thread openning database connection")
133
            self._dbConn.open()
S
Steven Li 已提交
134

S
Shuduo Sang 已提交
135 136
        self._doTaskLoop()

137
        # clean up
S
Shuduo Sang 已提交
138
        if (gConfig.per_thread_db_connection):  # type: ignore
139 140 141 142
            if self._dbConn.isOpen: #sometimes it is not open
                self._dbConn.close()
            else:
                logger.warning("Cleaning up worker thread, dbConn already closed")
143

S
Shuduo Sang 已提交
144
    def _doTaskLoop(self):
145 146
        # while self._curStep < self._pool.maxSteps:
        # tc = ThreadCoordinator(None)
S
Shuduo Sang 已提交
147 148
        while True:
            tc = self._tc  # Thread Coordinator, the overall master
149 150 151
            try:
                tc.crossStepBarrier()  # shared barrier first, INCLUDING the last one
            except threading.BrokenBarrierError as err: # main thread timed out
152
                print("_bto", end="")
153 154 155
                logger.debug("[TRD] Worker thread exiting due to main thread barrier time-out")
                break

156
            logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid))
157
            self.crossStepGate()   # then per-thread gate, after being tapped
158
            logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
159
            if not self._tc.isRunning():
160
                print("_wts", end="")
161
                logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
162 163
                break

164
            # Fetch a task from the Thread Coordinator
165
            logger.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid))
166
            task = tc.fetchTask()
167 168

            # Execute such a task
S
Shuduo Sang 已提交
169 170 171
            logger.debug(
                "[TRD] Worker thread [{}] about to execute task: {}".format(
                    self._tid, task.__class__.__name__))
172
            task.execute(self)
173
            tc.saveExecutedTask(task)
174
            logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
S
Shuduo Sang 已提交
175 176

            self._dbInUse = False  # there may be changes between steps
177
        # print("_wtd", end=None) # worker thread died
178

S
Shuduo Sang 已提交
179 180
    def verifyThreadSelf(self):  # ensure we are called by this own thread
        if (threading.get_ident() != self._thread.ident):
S
Steven Li 已提交
181 182
            raise RuntimeError("Unexpectly called from other threads")

S
Shuduo Sang 已提交
183 184
    def verifyThreadMain(self):  # ensure we are called by the main thread
        if (threading.get_ident() != threading.main_thread().ident):
S
Steven Li 已提交
185 186 187
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadAlive(self):
S
Shuduo Sang 已提交
188
        if (not self._thread.is_alive()):
S
Steven Li 已提交
189 190
            raise RuntimeError("Unexpected dead thread")

191
    # A gate is different from a barrier in that a thread needs to be "tapped"
S
Steven Li 已提交
192 193
    def crossStepGate(self):
        self.verifyThreadAlive()
S
Shuduo Sang 已提交
194 195
        self.verifyThreadSelf()  # only allowed by ourselves

196
        # Wait again at the "gate", waiting to be "tapped"
S
Shuduo Sang 已提交
197 198 199 200
        logger.debug(
            "[TRD] Worker thread {} about to cross the step gate".format(
                self._tid))
        self._stepGate.wait()
201
        self._stepGate.clear()
S
Shuduo Sang 已提交
202

203
        # self._curStep += 1  # off to a new step...
S
Steven Li 已提交
204

S
Shuduo Sang 已提交
205
    def tapStepGate(self):  # give it a tap, release the thread waiting there
206
        # self.verifyThreadAlive()
S
Shuduo Sang 已提交
207 208
        self.verifyThreadMain()  # only allowed for main thread

209 210 211 212 213 214
        if self._thread.is_alive():
            logger.debug("[TRD] Tapping worker thread {}".format(self._tid))
            self._stepGate.set()  # wake up!
            time.sleep(0)  # let the released thread run a bit
        else:
            print("_tad", end="") # Thread already dead
215

S
Shuduo Sang 已提交
216
    def execSql(self, sql):  # TODO: expose DbConn directly
217
        return self.getDbConn().execute(sql)
218

S
Shuduo Sang 已提交
219
    def querySql(self, sql):  # TODO: expose DbConn directly
220
        return self.getDbConn().query(sql)
221 222

    def getQueryResult(self):
223
        return self.getDbConn().getQueryResult()
224

225
    def getDbConn(self):
S
Shuduo Sang 已提交
226 227
        if (gConfig.per_thread_db_connection):
            return self._dbConn
228
        else:
229
            return self._tc.getDbManager().getDbConn()
230

231 232
    # def querySql(self, sql): # not "execute", since we are out side the DB context
    #     if ( gConfig.per_thread_db_connection ):
S
Shuduo Sang 已提交
233
    #         return self._dbConn.query(sql)
234 235
    #     else:
    #         return self._tc.getDbState().getDbConn().query(sql)
236

237
# The coordinator of all worker threads, mostly running in main thread
S
Shuduo Sang 已提交
238 239


240
class ThreadCoordinator:
S
Steven Li 已提交
241
    WORKER_THREAD_TIMEOUT = 60 # one minute
242

243
    def __init__(self, pool: ThreadPool, dbManager):
S
Shuduo Sang 已提交
244
        self._curStep = -1  # first step is 0
245
        self._pool = pool
246
        # self._wd = wd
S
Shuduo Sang 已提交
247
        self._te = None  # prepare for every new step
248
        self._dbManager = dbManager
S
Shuduo Sang 已提交
249 250
        self._executedTasks: List[Task] = []  # in a given step
        self._lock = threading.RLock()  # sync access for a few things
S
Steven Li 已提交
251

S
Shuduo Sang 已提交
252 253
        self._stepBarrier = threading.Barrier(
            self._pool.numThreads + 1)  # one barrier for all threads
254
        self._execStats = ExecutionStats()
255
        self._runStatus = MainExec.STATUS_RUNNING
S
Steven Li 已提交
256

257 258 259
    def getTaskExecutor(self):
        return self._te

S
Shuduo Sang 已提交
260
    def getDbManager(self) -> DbManager:
261
        return self._dbManager
262

263 264
    def crossStepBarrier(self, timeout=None):
        self._stepBarrier.wait(timeout) 
265

266 267 268 269
    def requestToStop(self):
        self._runStatus = MainExec.STATUS_STOPPING
        self._execStats.registerFailure("User Interruption")

270
    def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout):
271 272 273 274 275 276 277 278 279
        maxSteps = gConfig.max_steps  # type: ignore
        if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9
            return True
        if self._runStatus != MainExec.STATUS_RUNNING:
            return True
        if transitionFailed:
            return True
        if hasAbortedTask:
            return True
280 281
        if workerTimeout:
            return True
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
        return False

    def _hasAbortedTask(self): # from execution of previous step
        for task in self._executedTasks:
            if task.isAborted():
                # print("Task aborted: {}".format(task))
                # hasAbortedTask = True
                return True
        return False

    def _releaseAllWorkerThreads(self, transitionFailed):
        self._curStep += 1  # we are about to get into next step. TODO: race condition here!
        # Now not all threads had time to go to sleep
        logger.debug(
            "--\r\n\n--> Step {} starts with main thread waking up".format(self._curStep))

        # A new TE for the new step
        self._te = None # set to empty first, to signal worker thread to stop
        if not transitionFailed:  # only if not failed
            self._te = TaskExecutor(self._curStep)

        logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(
                self._curStep))  # Now not all threads had time to go to sleep
        # Worker threads will wake up at this point, and each execute it's own task
        self.tapAllThreads() # release all worker thread from their "gate"

    def _syncAtBarrier(self):
         # Now main thread (that's us) is ready to enter a step
        # let other threads go past the pool barrier, but wait at the
        # thread gate
        logger.debug("[TRD] Main thread about to cross the barrier")
313
        self.crossStepBarrier(timeout=self.WORKER_THREAD_TIMEOUT)
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343
        self._stepBarrier.reset()  # Other worker threads should now be at the "gate"
        logger.debug("[TRD] Main thread finished crossing the barrier")

    def _doTransition(self):
        transitionFailed = False
        try:
            sm = self._dbManager.getStateMachine()
            logger.debug("[STT] starting transitions")
            # at end of step, transiton the DB state
            sm.transition(self._executedTasks)
            logger.debug("[STT] transition ended")
            # Due to limitation (or maybe not) of the Python library,
            # we cannot share connections across threads
            if sm.hasDatabase():
                for t in self._pool.threadList:
                    logger.debug("[DB] use db for all worker threads")
                    t.useDb()
                    # t.execSql("use db") # main thread executing "use
                    # db" on behalf of every worker thread
        except taos.error.ProgrammingError as err:
            if (err.msg == 'network unavailable'):  # broken DB connection
                logger.info("DB connection broken, execution failed")
                traceback.print_stack()
                transitionFailed = True
                self._te = None  # Not running any more
                self._execStats.registerFailure("Broken DB Connection")
                # continue # don't do that, need to tap all threads at
                # end, and maybe signal them to stop
            else:
                raise
344
        return transitionFailed
345 346 347 348 349 350

        self.resetExecutedTasks()  # clear the tasks after we are done
        # Get ready for next step
        logger.debug("<-- Step {} finished, trasition failed = {}".format(self._curStep, transitionFailed))
        return transitionFailed

S
Shuduo Sang 已提交
351
    def run(self):
352
        self._pool.createAndStartThreads(self)
S
Steven Li 已提交
353 354

        # Coordinate all threads step by step
S
Shuduo Sang 已提交
355
        self._curStep = -1  # not started yet
356
        
S
Shuduo Sang 已提交
357
        self._execStats.startExec()  # start the stop watch
358 359
        transitionFailed = False
        hasAbortedTask = False
360 361
        workerTimeout = False
        while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout):
362
            if not gConfig.debug: # print this only if we are not in debug mode                
S
Shuduo Sang 已提交
363
                print(".", end="", flush=True)
364
                        
365 366 367 368 369 370 371 372 373 374
            try:
                self._syncAtBarrier() # For now just cross the barrier
            except threading.BrokenBarrierError as err:
                logger.info("Main loop aborted, caused by worker thread time-out")
                self._execStats.registerFailure("Aborted due to worker thread timeout")
                print("\n\nWorker Thread time-out detected, important thread info:")
                ts = ThreadStacks()
                ts.print(filterInternal=True)
                workerTimeout = True
                break
375 376

            # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
S
Shuduo Sang 已提交
377 378
            # We use this period to do house keeping work, when all worker
            # threads are QUIET.
379 380 381
            hasAbortedTask = self._hasAbortedTask() # from previous step
            if hasAbortedTask: 
                logger.info("Aborted task encountered, exiting test program")
382
                self._execStats.registerFailure("Aborted Task Encountered")
383
                break # do transition only if tasks are error free
S
Shuduo Sang 已提交
384

385
            # Ending previous step
386 387 388 389 390
            try:
                transitionFailed = self._doTransition() # To start, we end step -1 first
            except taos.error.ProgrammingError as err:
                transitionFailed = True
                errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno  # correct error scheme
S
Steven Li 已提交
391 392 393
                errMsg = "Transition failed: errno=0x{:X}, msg: {}".format(errno2, err)
                logger.info(errMsg)
                self._execStats.registerFailure(errMsg)
394

395 396
            # Then we move on to the next step
            self._releaseAllWorkerThreads(transitionFailed)                    
397

398 399
        if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate"
            logger.debug("Abnormal ending of main thraed")
400 401
        elif workerTimeout:
            logger.debug("Abnormal ending of main thread, due to worker timeout")
402 403 404
        else: # regular ending, workers waiting at "barrier"
            logger.debug("Regular ending, main thread waiting for all worker threads to stop...")
            self._syncAtBarrier()
405

406 407 408
        self._te = None  # No more executor, time to end
        logger.debug("Main thread tapping all threads one last time...")
        self.tapAllThreads()  # Let the threads run one last time
409

410
        logger.debug("\r\n\n--> Main thread ready to finish up...")
411
        logger.debug("Main thread joining all threads")
S
Shuduo Sang 已提交
412
        self._pool.joinAll()  # Get all threads to finish
413
        logger.info("\nAll worker threads finished")
414 415
        self._execStats.endExec()

416 417
    def printStats(self):
        self._execStats.printStats()
S
Steven Li 已提交
418

S
Steven Li 已提交
419 420 421 422 423 424
    def isFailed(self):
        return self._execStats.isFailed()

    def getExecStats(self):
        return self._execStats

S
Shuduo Sang 已提交
425
    def tapAllThreads(self):  # in a deterministic manner
S
Steven Li 已提交
426
        wakeSeq = []
S
Shuduo Sang 已提交
427 428
        for i in range(self._pool.numThreads):  # generate a random sequence
            if Dice.throw(2) == 1:
S
Steven Li 已提交
429 430 431
                wakeSeq.append(i)
            else:
                wakeSeq.insert(0, i)
S
Shuduo Sang 已提交
432 433 434
        logger.debug(
            "[TRD] Main thread waking up worker threads: {}".format(
                str(wakeSeq)))
435
        # TODO: set dice seed to a deterministic value
S
Steven Li 已提交
436
        for i in wakeSeq:
S
Shuduo Sang 已提交
437 438 439
            # TODO: maybe a bit too deep?!
            self._pool.threadList[i].tapStepGate()
            time.sleep(0)  # yield
S
Steven Li 已提交
440

441
    def isRunning(self):
S
Shuduo Sang 已提交
442
        return self._te is not None
443

S
Shuduo Sang 已提交
444 445
    def fetchTask(self) -> Task:
        if (not self.isRunning()):  # no task
446
            raise RuntimeError("Cannot fetch task when not running")
447 448
        # return self._wd.pickTask()
        # Alternatively, let's ask the DbState for the appropriate task
449 450 451 452 453 454 455
        # dbState = self.getDbState()
        # tasks = dbState.getTasksAtState() # TODO: create every time?
        # nTasks = len(tasks)
        # i = Dice.throw(nTasks)
        # logger.debug(" (dice:{}/{}) ".format(i, nTasks))
        # # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc.
        # return tasks[i].clone() # TODO: still necessary?
S
Shuduo Sang 已提交
456 457 458 459 460
        # pick a task type for current state
        taskType = self.getDbManager().getStateMachine().pickTaskType()
        return taskType(
            self.getDbManager(),
            self._execStats)  # create a task from it
461 462

    def resetExecutedTasks(self):
S
Shuduo Sang 已提交
463
        self._executedTasks = []  # should be under single thread
464 465 466 467

    def saveExecutedTask(self, task):
        with self._lock:
            self._executedTasks.append(task)
468 469

# We define a class to run a number of threads in locking steps.
S
Shuduo Sang 已提交
470 471


472
class ThreadPool:
473
    def __init__(self, numThreads, maxSteps):
474 475 476 477
        self.numThreads = numThreads
        self.maxSteps = maxSteps
        # Internal class variables
        self.curStep = 0
S
Shuduo Sang 已提交
478 479
        self.threadList = []  # type: List[WorkerThread]

480
    # starting to run all the threads, in locking steps
481
    def createAndStartThreads(self, tc: ThreadCoordinator):
S
Shuduo Sang 已提交
482 483
        for tid in range(0, self.numThreads):  # Create the threads
            workerThread = WorkerThread(self, tid, tc)
484
            self.threadList.append(workerThread)
S
Shuduo Sang 已提交
485
            workerThread.start()  # start, but should block immediately before step 0
486 487 488 489 490 491

    def joinAll(self):
        for workerThread in self.threadList:
            logger.debug("Joining thread...")
            workerThread._thread.join()

492 493
# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers
# for new table names
S
Shuduo Sang 已提交
494 495


S
Steven Li 已提交
496 497
class LinearQueue():
    def __init__(self):
498
        self.firstIndex = 1  # 1st ever element
S
Steven Li 已提交
499
        self.lastIndex = 0
S
Shuduo Sang 已提交
500 501
        self._lock = threading.RLock()  # our functions may call each other
        self.inUse = set()  # the indexes that are in use right now
S
Steven Li 已提交
502

503
    def toText(self):
S
Shuduo Sang 已提交
504 505
        return "[{}..{}], in use: {}".format(
            self.firstIndex, self.lastIndex, self.inUse)
506 507

    # Push (add new element, largest) to the tail, and mark it in use
S
Shuduo Sang 已提交
508
    def push(self):
509
        with self._lock:
S
Shuduo Sang 已提交
510 511
            # if ( self.isEmpty() ):
            #     self.lastIndex = self.firstIndex
512
            #     return self.firstIndex
513 514
            # Otherwise we have something
            self.lastIndex += 1
515 516
            self.allocate(self.lastIndex)
            # self.inUse.add(self.lastIndex) # mark it in use immediately
517
            return self.lastIndex
S
Steven Li 已提交
518 519

    def pop(self):
520
        with self._lock:
S
Shuduo Sang 已提交
521 522 523 524
            if (self.isEmpty()):
                # raise RuntimeError("Cannot pop an empty queue")
                return False  # TODO: None?

525
            index = self.firstIndex
S
Shuduo Sang 已提交
526
            if (index in self.inUse):
527 528
                return False

529 530 531 532 533 534 535
            self.firstIndex += 1
            return index

    def isEmpty(self):
        return self.firstIndex > self.lastIndex

    def popIfNotEmpty(self):
536
        with self._lock:
537 538 539 540
            if (self.isEmpty()):
                return 0
            return self.pop()

S
Steven Li 已提交
541
    def allocate(self, i):
542
        with self._lock:
543
            # logger.debug("LQ allocating item {}".format(i))
S
Shuduo Sang 已提交
544 545 546
            if (i in self.inUse):
                raise RuntimeError(
                    "Cannot re-use same index in queue: {}".format(i))
547 548
            self.inUse.add(i)

S
Steven Li 已提交
549
    def release(self, i):
550
        with self._lock:
551
            # logger.debug("LQ releasing item {}".format(i))
S
Shuduo Sang 已提交
552
            self.inUse.remove(i)  # KeyError possible, TODO: why?
553 554 555 556

    def size(self):
        return self.lastIndex + 1 - self.firstIndex

S
Steven Li 已提交
557
    def pickAndAllocate(self):
S
Shuduo Sang 已提交
558
        if (self.isEmpty()):
559 560
            return None
        with self._lock:
S
Shuduo Sang 已提交
561
            cnt = 0  # counting the interations
562 563
            while True:
                cnt += 1
S
Shuduo Sang 已提交
564
                if (cnt > self.size() * 10):  # 10x iteration already
565 566
                    # raise RuntimeError("Failed to allocate LinearQueue element")
                    return None
S
Shuduo Sang 已提交
567 568
                ret = Dice.throwRange(self.firstIndex, self.lastIndex + 1)
                if (ret not in self.inUse):
569 570 571
                    self.allocate(ret)
                    return ret

S
Shuduo Sang 已提交
572

573
class DbConn:
574
    TYPE_NATIVE = "native-c"
575
    TYPE_REST =   "rest-api"
576 577 578 579 580 581 582 583 584
    TYPE_INVALID = "invalid"

    @classmethod
    def create(cls, connType):
        if connType == cls.TYPE_NATIVE:
            return DbConnNative()
        elif connType == cls.TYPE_REST:
            return DbConnRest()
        else:
S
Shuduo Sang 已提交
585 586
            raise RuntimeError(
                "Unexpected connection type: {}".format(connType))
587 588 589 590 591 592 593 594 595

    @classmethod
    def createNative(cls):
        return cls.create(cls.TYPE_NATIVE)

    @classmethod
    def createRest(cls):
        return cls.create(cls.TYPE_REST)

596 597
    def __init__(self):
        self.isOpen = False
598
        self._type = self.TYPE_INVALID
599 600 601 602
        self._lastSql = None

    def getLastSql(self):
        return self._lastSql
603 604

    def open(self):
S
Shuduo Sang 已提交
605
        if (self.isOpen):
606 607
            raise RuntimeError("Cannot re-open an existing DB connection")

608 609
        # below implemented by child classes
        self.openByType()
610

611
        logger.debug("[DB] data connection opened, type = {}".format(self._type))
612 613
        self.isOpen = True

S
Shuduo Sang 已提交
614 615 616 617
    def resetDb(self):  # reset the whole database, etc.
        if (not self.isOpen):
            raise RuntimeError(
                "Cannot reset database until connection is open")
618 619
        # self._tdSql.prepare() # Recreate database, etc.

620
        self.execute('drop database if exists db')
621 622
        logger.debug("Resetting DB, dropped database")
        # self._cursor.execute('create database db')
623
        # self._cursor.execute('use db')
624 625
        # tdSql.execute('show databases')

S
Shuduo Sang 已提交
626
    def queryScalar(self, sql) -> int:
627 628
        return self._queryAny(sql)

S
Shuduo Sang 已提交
629
    def queryString(self, sql) -> str:
630 631
        return self._queryAny(sql)

S
Shuduo Sang 已提交
632 633
    def _queryAny(self, sql):  # actual query result as an int
        if (not self.isOpen):
634
            raise RuntimeError("Cannot query database until connection is open")
635
        nRows = self.query(sql)
S
Shuduo Sang 已提交
636
        if nRows != 1:
637
            raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows))
638
        if self.getResultRows() != 1 or self.getResultCols() != 1:
639
            raise RuntimeError("Unexpected result set for query: {}".format(sql))
640 641
        return self.getQueryResult()[0][0]

642 643 644 645 646 647 648 649 650
    def use(self, dbName):
        self.execute("use {}".format(dbName))

    def hasDatabases(self):
        return self.query("show databases") > 0

    def hasTables(self):
        return self.query("show tables") > 0

651 652
    def execute(self, sql):
        raise RuntimeError("Unexpected execution, should be overriden")
S
Shuduo Sang 已提交
653

654 655 656
    def query(self, sql) -> int: # return num rows returned
        raise RuntimeError("Unexpected execution, should be overriden")

657 658
    def openByType(self):
        raise RuntimeError("Unexpected execution, should be overriden")
S
Shuduo Sang 已提交
659

660 661
    def getQueryResult(self):
        raise RuntimeError("Unexpected execution, should be overriden")
S
Shuduo Sang 已提交
662

663 664
    def getResultRows(self):
        raise RuntimeError("Unexpected execution, should be overriden")
S
Shuduo Sang 已提交
665

666 667 668 669
    def getResultCols(self):
        raise RuntimeError("Unexpected execution, should be overriden")

# Sample: curl -u root:taosdata -d "show databases" localhost:6020/rest/sql
S
Shuduo Sang 已提交
670 671


672 673 674 675
class DbConnRest(DbConn):
    def __init__(self):
        super().__init__()
        self._type = self.TYPE_REST
S
Shuduo Sang 已提交
676
        self._url = "http://localhost:6020/rest/sql"  # fixed for now
677 678
        self._result = None

S
Shuduo Sang 已提交
679 680 681
    def openByType(self):  # Open connection
        pass  # do nothing, always open

682
    def close(self):
S
Shuduo Sang 已提交
683 684 685
        if (not self.isOpen):
            raise RuntimeError(
                "Cannot clean up database until connection is open")
686 687 688 689 690
        # Do nothing for REST
        logger.debug("[DB] REST Database connection closed")
        self.isOpen = False

    def _doSql(self, sql):
691
        self._lastSql = sql # remember this, last SQL attempted
692 693 694
        try:
            r = requests.post(self._url, 
                data = sql,
695
                auth = HTTPBasicAuth('root', 'taosdata'))         
696 697 698
        except:
            print("REST API Failure (TODO: more info here)")
            raise
699 700
        rj = r.json()
        # Sanity check for the "Json Result"
S
Shuduo Sang 已提交
701
        if ('status' not in rj):
702 703
            raise RuntimeError("No status in REST response")

S
Shuduo Sang 已提交
704 705 706 707
        if rj['status'] == 'error':  # clearly reported error
            if ('code' not in rj):  # error without code
                raise RuntimeError("REST error return without code")
            errno = rj['code']  # May need to massage this in the future
708
            # print("Raising programming error with REST return: {}".format(rj))
S
Shuduo Sang 已提交
709 710
            raise taos.error.ProgrammingError(
                rj['desc'], errno)  # todo: check existance of 'desc'
711

S
Shuduo Sang 已提交
712 713 714 715
        if rj['status'] != 'succ':  # better be this
            raise RuntimeError(
                "Unexpected REST return status: {}".format(
                    rj['status']))
716 717

        nRows = rj['rows'] if ('rows' in rj) else 0
S
Shuduo Sang 已提交
718
        self._result = rj
719 720
        return nRows

S
Shuduo Sang 已提交
721 722 723 724
    def execute(self, sql):
        if (not self.isOpen):
            raise RuntimeError(
                "Cannot execute database commands until connection is open")
725 726
        logger.debug("[SQL-REST] Executing SQL: {}".format(sql))
        nRows = self._doSql(sql)
S
Shuduo Sang 已提交
727 728
        logger.debug(
            "[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
729 730
        return nRows

S
Shuduo Sang 已提交
731
    def query(self, sql):  # return rows affected
732 733 734 735 736 737 738 739 740 741 742 743 744
        return self.execute(sql)

    def getQueryResult(self):
        return self._result['data']

    def getResultRows(self):
        print(self._result)
        raise RuntimeError("TBD")
        # return self._tdSql.queryRows

    def getResultCols(self):
        print(self._result)
        raise RuntimeError("TBD")
S
Shuduo Sang 已提交
745

746
    # Duplicate code from TDMySQL, TODO: merge all this into DbConnNative
747 748


749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776
class MyTDSql:
    def __init__(self):
        self.queryRows = 0
        self.queryCols = 0
        self.affectedRows = 0

    def init(self, cursor, log=True):
        self.cursor = cursor
        # if (log):
        #     caller = inspect.getframeinfo(inspect.stack()[1][0])
        #     self.cursor.log(caller.filename + ".sql")

    def close(self):
        self.cursor.close()

    def query(self, sql):
        self.sql = sql
        try:
            self.cursor.execute(sql)
            self.queryResult = self.cursor.fetchall()
            self.queryRows = len(self.queryResult)
            self.queryCols = len(self.cursor.description)
        except Exception as e:
            # caller = inspect.getframeinfo(inspect.stack()[1][0])
            # args = (caller.filename, caller.lineno, sql, repr(e))
            # tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
            raise
        return self.queryRows
777

778 779 780 781 782 783 784 785 786 787 788
    def execute(self, sql):
        self.sql = sql
        try:
            self.affectedRows = self.cursor.execute(sql)
        except Exception as e:
            # caller = inspect.getframeinfo(inspect.stack()[1][0])
            # args = (caller.filename, caller.lineno, sql, repr(e))
            # tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
            raise
        return self.affectedRows

S
Shuduo Sang 已提交
789

790
class DbConnNative(DbConn):
791 792 793 794
    # Class variables
    _lock = threading.Lock()
    _connInfoDisplayed = False

795 796
    def __init__(self):
        super().__init__()
797
        self._type = self.TYPE_NATIVE
S
Shuduo Sang 已提交
798
        self._conn = None
799
        self._cursor = None
800
        
S
Shuduo Sang 已提交
801

802 803 804 805 806 807 808
    def getBuildPath(self):
        selfPath = os.path.dirname(os.path.realpath(__file__))
        if ("community" in selfPath):
            projPath = selfPath[:selfPath.find("communit")]
        else:
            projPath = selfPath[:selfPath.find("tests")]

809
        buildPath = None
810 811 812 813
        for root, dirs, files in os.walk(projPath):
            if ("taosd" in files):
                rootRealPath = os.path.dirname(os.path.realpath(root))
                if ("packaging" not in rootRealPath):
S
Shuduo Sang 已提交
814
                    buildPath = root[:len(root) - len("/build/bin")]
815
                    break
816
        if buildPath == None:
S
Steven Li 已提交
817
            raise RuntimeError("Failed to determine buildPath, selfPath={}".format(selfPath))
818 819
        return buildPath

820
    
S
Shuduo Sang 已提交
821
    def openByType(self):  # Open connection
822
        cfgPath = self.getBuildPath() + "/test/cfg"
823
        hostAddr = "127.0.0.1"
824

825 826 827 828 829
        with self._lock: # force single threading for opening DB connections
            if not self._connInfoDisplayed:
                self.__class__._connInfoDisplayed = True # updating CLASS variable
                logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath))
            
830
            self._conn = taos.connect(host=hostAddr, config=cfgPath)  # TODO: make configurable
831 832
            self._cursor = self._conn.cursor()
        
833
        self._cursor.execute('reset query cache')
S
Shuduo Sang 已提交
834
        # self._cursor.execute('use db') # do this at the beginning of every
835 836

        # Open connection
837
        self._tdSql = MyTDSql()
838
        self._tdSql.init(self._cursor)
S
Shuduo Sang 已提交
839

840
    def close(self):
S
Shuduo Sang 已提交
841 842 843
        if (not self.isOpen):
            raise RuntimeError(
                "Cannot clean up database until connection is open")
844
        self._tdSql.close()
845
        logger.debug("[DB] Database connection closed")
846
        self.isOpen = False
S
Steven Li 已提交
847

S
Shuduo Sang 已提交
848 849 850 851
    def execute(self, sql):
        if (not self.isOpen):
            raise RuntimeError(
                "Cannot execute database commands until connection is open")
852 853
        logger.debug("[SQL] Executing SQL: {}".format(sql))
        nRows = self._tdSql.execute(sql)
S
Shuduo Sang 已提交
854 855 856
        logger.debug(
            "[SQL] Execution Result, nRows = {}, SQL = {}".format(
                nRows, sql))
857
        return nRows
S
Steven Li 已提交
858

S
Shuduo Sang 已提交
859 860 861 862
    def query(self, sql):  # return rows affected
        if (not self.isOpen):
            raise RuntimeError(
                "Cannot query database until connection is open")
863 864
        logger.debug("[SQL] Executing SQL: {}".format(sql))
        nRows = self._tdSql.query(sql)
S
Shuduo Sang 已提交
865 866 867
        logger.debug(
            "[SQL] Query Result, nRows = {}, SQL = {}".format(
                nRows, sql))
868
        return nRows
869
        # results are in: return self._tdSql.queryResult
870

871 872 873
    def getQueryResult(self):
        return self._tdSql.queryResult

874 875
    def getResultRows(self):
        return self._tdSql.queryRows
876

877 878
    def getResultCols(self):
        return self._tdSql.queryCols
879

S
Shuduo Sang 已提交
880

881
class AnyState:
S
Shuduo Sang 已提交
882 883 884
    STATE_INVALID = -1
    STATE_EMPTY = 0  # nothing there, no even a DB
    STATE_DB_ONLY = 1  # we have a DB, but nothing else
885
    STATE_TABLE_ONLY = 2  # we have a table, but totally empty
S
Shuduo Sang 已提交
886
    STATE_HAS_DATA = 3  # we have some data in the table
887 888 889 890 891
    _stateNames = ["Invalid", "Empty", "DB_Only", "Table_Only", "Has_Data"]

    STATE_VAL_IDX = 0
    CAN_CREATE_DB = 1
    CAN_DROP_DB = 2
892 893
    CAN_CREATE_FIXED_SUPER_TABLE = 3
    CAN_DROP_FIXED_SUPER_TABLE = 4
894 895 896 897 898 899 900
    CAN_ADD_DATA = 5
    CAN_READ_DATA = 6

    def __init__(self):
        self._info = self.getInfo()

    def __str__(self):
S
Shuduo Sang 已提交
901 902
        # -1 hack to accomodate the STATE_INVALID case
        return self._stateNames[self._info[self.STATE_VAL_IDX] + 1]
903 904 905 906

    def getInfo(self):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
907 908 909 910 911 912
    def equals(self, other):
        if isinstance(other, int):
            return self.getValIndex() == other
        elif isinstance(other, AnyState):
            return self.getValIndex() == other.getValIndex()
        else:
S
Shuduo Sang 已提交
913 914 915
            raise RuntimeError(
                "Unexpected comparison, type = {}".format(
                    type(other)))
S
Steven Li 已提交
916

917 918 919
    def verifyTasksToState(self, tasks, newState):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
920 921 922
    def getValIndex(self):
        return self._info[self.STATE_VAL_IDX]

923 924
    def getValue(self):
        return self._info[self.STATE_VAL_IDX]
S
Shuduo Sang 已提交
925

926 927
    def canCreateDb(self):
        return self._info[self.CAN_CREATE_DB]
S
Shuduo Sang 已提交
928

929 930
    def canDropDb(self):
        return self._info[self.CAN_DROP_DB]
S
Shuduo Sang 已提交
931

932 933
    def canCreateFixedSuperTable(self):
        return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE]
S
Shuduo Sang 已提交
934

935 936
    def canDropFixedSuperTable(self):
        return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
S
Shuduo Sang 已提交
937

938 939
    def canAddData(self):
        return self._info[self.CAN_ADD_DATA]
S
Shuduo Sang 已提交
940

941 942 943 944 945
    def canReadData(self):
        return self._info[self.CAN_READ_DATA]

    def assertAtMostOneSuccess(self, tasks, cls):
        sCnt = 0
S
Shuduo Sang 已提交
946
        for task in tasks:
947 948 949
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
S
Steven Li 已提交
950
                # task.logDebug("Task success found")
951
                sCnt += 1
S
Shuduo Sang 已提交
952 953 954
                if (sCnt >= 2):
                    raise RuntimeError(
                        "Unexpected more than 1 success with task: {}".format(cls))
955 956 957 958

    def assertIfExistThenSuccess(self, tasks, cls):
        sCnt = 0
        exists = False
S
Shuduo Sang 已提交
959
        for task in tasks:
960 961
            if not isinstance(task, cls):
                continue
S
Shuduo Sang 已提交
962
            exists = True  # we have a valid instance
963 964
            if task.isSuccess():
                sCnt += 1
S
Shuduo Sang 已提交
965 966 967
        if (exists and sCnt <= 0):
            raise RuntimeError(
                "Unexpected zero success for task: {}".format(cls))
968 969

    def assertNoTask(self, tasks, cls):
S
Shuduo Sang 已提交
970
        for task in tasks:
971
            if isinstance(task, cls):
S
Shuduo Sang 已提交
972 973
                raise CrashGenError(
                    "This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__))
974 975

    def assertNoSuccess(self, tasks, cls):
S
Shuduo Sang 已提交
976
        for task in tasks:
977 978
            if isinstance(task, cls):
                if task.isSuccess():
S
Shuduo Sang 已提交
979 980
                    raise RuntimeError(
                        "Unexpected successful task: {}".format(cls))
981 982

    def hasSuccess(self, tasks, cls):
S
Shuduo Sang 已提交
983
        for task in tasks:
984 985 986 987 988 989
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
                return True
        return False

S
Steven Li 已提交
990
    def hasTask(self, tasks, cls):
S
Shuduo Sang 已提交
991
        for task in tasks:
S
Steven Li 已提交
992 993 994 995
            if isinstance(task, cls):
                return True
        return False

S
Shuduo Sang 已提交
996

997 998 999 1000
class StateInvalid(AnyState):
    def getInfo(self):
        return [
            self.STATE_INVALID,
S
Shuduo Sang 已提交
1001 1002 1003
            False, False,  # can create/drop Db
            False, False,  # can create/drop fixed table
            False, False,  # can insert/read data with fixed table
1004 1005 1006 1007
        ]

    # def verifyTasksToState(self, tasks, newState):

S
Shuduo Sang 已提交
1008

1009 1010 1011 1012
class StateEmpty(AnyState):
    def getInfo(self):
        return [
            self.STATE_EMPTY,
S
Shuduo Sang 已提交
1013 1014 1015
            True, False,  # can create/drop Db
            False, False,  # can create/drop fixed table
            False, False,  # can insert/read data with fixed table
1016 1017
        ]

S
Shuduo Sang 已提交
1018 1019
    def verifyTasksToState(self, tasks, newState):
        if (self.hasSuccess(tasks, TaskCreateDb)
1020
                ):  # at EMPTY, if there's succes in creating DB
S
Shuduo Sang 已提交
1021 1022 1023 1024
            if (not self.hasTask(tasks, TaskDropDb)):  # and no drop_db tasks
                # we must have at most one. TODO: compare numbers
                self.assertAtMostOneSuccess(tasks, TaskCreateDb)

1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035

class StateDbOnly(AnyState):
    def getInfo(self):
        return [
            self.STATE_DB_ONLY,
            False, True,
            True, False,
            False, False,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
1036 1037 1038
        if (not self.hasTask(tasks, TaskCreateDb)):
            # only if we don't create any more
            self.assertAtMostOneSuccess(tasks, TaskDropDb)
1039 1040 1041 1042 1043

        # TODO: restore the below, the problem exists, although unlikely in real-world
        # if (gSvcMgr!=None) and gSvcMgr.isRestarting():     
        # if (gSvcMgr == None) or (not gSvcMgr.isRestarting()) : 
        #     self.assertIfExistThenSuccess(tasks, TaskDropDb)       
1044

S
Shuduo Sang 已提交
1045

1046
class StateSuperTableOnly(AnyState):
1047 1048 1049 1050 1051 1052 1053 1054 1055
    def getInfo(self):
        return [
            self.STATE_TABLE_ONLY,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
1056
        if (self.hasSuccess(tasks, TaskDropSuperTable)
1057
                ):  # we are able to drop the table
1058
            #self.assertAtMostOneSuccess(tasks, TaskDropSuperTable)
S
Shuduo Sang 已提交
1059 1060
            # we must have had recreted it
            self.hasSuccess(tasks, TaskCreateSuperTable)
1061

1062
            # self._state = self.STATE_DB_ONLY
S
Steven Li 已提交
1063 1064
        # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
        #     self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases
1065
            # self._state = self.STATE_HAS_DATA
S
Steven Li 已提交
1066 1067 1068
        # elif ( self.hasSuccess(tasks, ReadFixedDataTask) ): # no success in prev cases, but was able to read data
            # self.assertNoTask(tasks, DropFixedTableTask)
            # self.assertNoTask(tasks, AddFixedDataTask)
1069
            # self._state = self.STATE_TABLE_ONLY # no change
S
Steven Li 已提交
1070 1071 1072
        # else: # did not drop table, did not insert data, did not read successfully, that is impossible
        #     raise RuntimeError("Unexpected no-success scenarios")
        # TODO: need to revamp!!
1073

S
Shuduo Sang 已提交
1074

1075 1076 1077 1078 1079 1080 1081 1082 1083 1084
class StateHasData(AnyState):
    def getInfo(self):
        return [
            self.STATE_HAS_DATA,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
1085
        if (newState.equals(AnyState.STATE_EMPTY)):
1086
            self.hasSuccess(tasks, TaskDropDb)
S
Shuduo Sang 已提交
1087 1088 1089 1090
            if (not self.hasTask(tasks, TaskCreateDb)):
                self.assertAtMostOneSuccess(tasks, TaskDropDb)  # TODO: dicy
        elif (newState.equals(AnyState.STATE_DB_ONLY)):  # in DB only
            if (not self.hasTask(tasks, TaskCreateDb)
1091
                ):  # without a create_db task
S
Shuduo Sang 已提交
1092 1093
                # we must have drop_db task
                self.assertNoTask(tasks, TaskDropDb)
1094
            self.hasSuccess(tasks, TaskDropSuperTable)
1095
            # self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy
1096 1097 1098 1099
        # elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted
            # self.assertNoTask(tasks, TaskDropDb)
            # self.assertNoTask(tasks, TaskDropSuperTable)
            # self.assertNoTask(tasks, TaskAddData)
S
Steven Li 已提交
1100
            # self.hasSuccess(tasks, DeleteDataTasks)
S
Shuduo Sang 已提交
1101 1102
        else:  # should be STATE_HAS_DATA
            if (not self.hasTask(tasks, TaskCreateDb)
1103
                ):  # only if we didn't create one
S
Shuduo Sang 已提交
1104 1105 1106
                # we shouldn't have dropped it
                self.assertNoTask(tasks, TaskDropDb)
            if (not self.hasTask(tasks, TaskCreateSuperTable)
1107
                    ):  # if we didn't create the table
S
Shuduo Sang 已提交
1108 1109
                # we should not have a task that drops it
                self.assertNoTask(tasks, TaskDropSuperTable)
1110
            # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask)
S
Steven Li 已提交
1111

S
Shuduo Sang 已提交
1112

1113
class StateMechine:
1114 1115
    def __init__(self, dbConn):
        self._dbConn = dbConn
S
Shuduo Sang 已提交
1116 1117 1118
        self._curState = self._findCurrentState()  # starting state
        # transitition target probabilities, indexed with value of STATE_EMPTY,
        # STATE_DB_ONLY, etc.
1119
        self._stateWeights = [1, 2, 10, 40]
S
Shuduo Sang 已提交
1120

1121 1122 1123
    def getCurrentState(self):
        return self._curState

1124 1125 1126
    def hasDatabase(self):
        return self._curState.canDropDb()  # ha, can drop DB means it has one

1127
    # May be slow, use cautionsly...
S
Shuduo Sang 已提交
1128
    def getTaskTypes(self):  # those that can run (directly/indirectly) from the current state
1129 1130 1131 1132 1133 1134
        def typesToStrings(types):
            ss = []
            for t in types:
                ss.append(t.__name__)
            return ss

S
Shuduo Sang 已提交
1135
        allTaskClasses = StateTransitionTask.__subclasses__()  # all state transition tasks
1136 1137
        firstTaskTypes = []
        for tc in allTaskClasses:
S
Shuduo Sang 已提交
1138
            # t = tc(self) # create task object
1139 1140
            if tc.canBeginFrom(self._curState):
                firstTaskTypes.append(tc)
S
Shuduo Sang 已提交
1141 1142 1143 1144 1145 1146 1147 1148
        # now we have all the tasks that can begin directly from the current
        # state, let's figure out the INDIRECT ones
        taskTypes = firstTaskTypes.copy()  # have to have these
        for task1 in firstTaskTypes:  # each task type gathered so far
            endState = task1.getEndState()  # figure the end state
            if endState is None:  # does not change end state
                continue  # no use, do nothing
            for tc in allTaskClasses:  # what task can further begin from there?
1149
                if tc.canBeginFrom(endState) and (tc not in firstTaskTypes):
S
Shuduo Sang 已提交
1150
                    taskTypes.append(tc)  # gather it
1151 1152

        if len(taskTypes) <= 0:
S
Shuduo Sang 已提交
1153 1154 1155 1156 1157 1158 1159
            raise RuntimeError(
                "No suitable task types found for state: {}".format(
                    self._curState))
        logger.debug(
            "[OPS] Tasks found for state {}: {}".format(
                self._curState,
                typesToStrings(taskTypes)))
1160 1161 1162 1163
        return taskTypes

    def _findCurrentState(self):
        dbc = self._dbConn
S
Shuduo Sang 已提交
1164
        ts = time.time()  # we use this to debug how fast/slow it is to do the various queries to find the current DB state
1165 1166
        if not dbc.hasDatabases():  # no database?!
            logger.debug( "[STT] empty database found, between {} and {}".format(ts, time.time()))
1167
            return StateEmpty()
S
Shuduo Sang 已提交
1168 1169
        # did not do this when openning connection, and this is NOT the worker
        # thread, which does this on their own
1170 1171 1172
        dbc.use("db")
        if not dbc.hasTables():  # no tables
            logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
1173
            return StateDbOnly()
1174 1175 1176 1177

        sTable = DbManager.getFixedSuperTable()
        if sTable.hasRegTables(dbc):  # no regular tables
            logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
1178
            return StateSuperTableOnly()
S
Shuduo Sang 已提交
1179
        else:  # has actual tables
1180
            logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
1181 1182 1183
            return StateHasData()

    def transition(self, tasks):
S
Shuduo Sang 已提交
1184
        if (len(tasks) == 0):  # before 1st step, or otherwise empty
1185
            logger.debug("[STT] Starting State: {}".format(self._curState))
S
Shuduo Sang 已提交
1186
            return  # do nothing
1187

S
Shuduo Sang 已提交
1188 1189
        # this should show up in the server log, separating steps
        self._dbConn.execute("show dnodes")
1190 1191 1192 1193

        # Generic Checks, first based on the start state
        if self._curState.canCreateDb():
            self._curState.assertIfExistThenSuccess(tasks, TaskCreateDb)
S
Shuduo Sang 已提交
1194 1195
            # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in
            # case of multiple creation and drops
1196 1197

        if self._curState.canDropDb():
1198
            if gSvcMgr == None: # only if we are running as client-only
S
Steven Li 已提交
1199
                self._curState.assertIfExistThenSuccess(tasks, TaskDropDb)
S
Shuduo Sang 已提交
1200 1201
            # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in
            # case of drop-create-drop
1202 1203 1204

        # if self._state.canCreateFixedTable():
            # self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped
S
Shuduo Sang 已提交
1205 1206
            # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not
            # really, in case of create-drop-create
1207 1208 1209

        # if self._state.canDropFixedTable():
            # self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped
S
Shuduo Sang 已提交
1210 1211
            # self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not
            # really in case of drop-create-drop
1212 1213

        # if self._state.canAddData():
S
Shuduo Sang 已提交
1214 1215
        # self.assertIfExistThenSuccess(tasks, AddFixedDataTask)  # not true
        # actually
1216 1217 1218 1219 1220 1221

        # if self._state.canReadData():
            # Nothing for sure

        newState = self._findCurrentState()
        logger.debug("[STT] New DB state determined: {}".format(newState))
S
Shuduo Sang 已提交
1222 1223
        # can old state move to new state through the tasks?
        self._curState.verifyTasksToState(tasks, newState)
1224 1225 1226
        self._curState = newState

    def pickTaskType(self):
S
Shuduo Sang 已提交
1227 1228
        # all the task types we can choose from at curent state
        taskTypes = self.getTaskTypes()
1229 1230 1231
        weights = []
        for tt in taskTypes:
            endState = tt.getEndState()
S
Shuduo Sang 已提交
1232 1233 1234
            if endState is not None:
                # TODO: change to a method
                weights.append(self._stateWeights[endState.getValIndex()])
1235
            else:
S
Shuduo Sang 已提交
1236 1237
                # read data task, default to 10: TODO: change to a constant
                weights.append(10)
1238
        i = self._weighted_choice_sub(weights)
S
Shuduo Sang 已提交
1239
        # logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))
1240 1241
        return taskTypes[i]

S
Shuduo Sang 已提交
1242 1243 1244 1245 1246
    # ref:
    # https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
    def _weighted_choice_sub(self, weights):
        # TODO: use our dice to ensure it being determinstic?
        rnd = random.random() * sum(weights)
1247 1248 1249 1250
        for i, w in enumerate(weights):
            rnd -= w
            if rnd < 0:
                return i
1251

1252
# Manager of the Database Data/Connection
S
Shuduo Sang 已提交
1253 1254 1255 1256


class DbManager():
    def __init__(self, resetDb=True):
S
Steven Li 已提交
1257
        self.tableNumQueue = LinearQueue()
S
Shuduo Sang 已提交
1258 1259 1260
        # datetime.datetime(2019, 1, 1) # initial date time tick
        self._lastTick = self.setupLastTick()
        self._lastInt = 0  # next one is initial integer
1261
        self._lock = threading.RLock()
S
Shuduo Sang 已提交
1262

1263
        # self.openDbServerConnection()
S
Shuduo Sang 已提交
1264 1265
        self._dbConn = DbConn.createNative() if (
            gConfig.connector_type == 'native') else DbConn.createRest()
1266
        try:
S
Shuduo Sang 已提交
1267
            self._dbConn.open()  # may throw taos.error.ProgrammingError: disconnected
1268 1269
        except taos.error.ProgrammingError as err:
            # print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
S
Shuduo Sang 已提交
1270 1271 1272
            if (err.msg == 'client disconnected'):  # cannot open DB connection
                print(
                    "Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
S
Steven Li 已提交
1273
                sys.exit(2)
1274
            else:
S
Shuduo Sang 已提交
1275 1276
                raise
        except BaseException:
S
Steven Li 已提交
1277
            print("[=] Unexpected exception")
S
Shuduo Sang 已提交
1278 1279 1280 1281
            raise

        if resetDb:
            self._dbConn.resetDb()  # drop and recreate DB
1282

S
Shuduo Sang 已提交
1283 1284
        # Do this after dbConn is in proper shape
        self._stateMachine = StateMechine(self._dbConn)
1285

1286 1287 1288
    def getDbConn(self):
        return self._dbConn

S
Shuduo Sang 已提交
1289
    def getStateMachine(self) -> StateMechine:
1290 1291 1292 1293
        return self._stateMachine

    # def getState(self):
    #     return self._stateMachine.getCurrentState()
1294 1295 1296 1297 1298 1299

    # We aim to create a starting time tick, such that, whenever we run our test here once
    # We should be able to safely create 100,000 records, which will not have any repeated time stamp
    # when we re-run the test in 3 minutes (180 seconds), basically we should expand time duration
    # by a factor of 500.
    # TODO: what if it goes beyond 10 years into the future
1300
    # TODO: fix the error as result of above: "tsdb timestamp is out of range"
1301
    def setupLastTick(self):
1302
        t1 = datetime.datetime(2020, 6, 1)
1303
        t2 = datetime.datetime.now()
S
Shuduo Sang 已提交
1304 1305 1306 1307
        # maybe a very large number, takes 69 years to exceed Python int range
        elSec = int(t2.timestamp() - t1.timestamp())
        elSec2 = (elSec % (8 * 12 * 30 * 24 * 60 * 60 / 500)) * \
            500  # a number representing seconds within 10 years
1308
        # print("elSec = {}".format(elSec))
S
Shuduo Sang 已提交
1309 1310 1311
        t3 = datetime.datetime(2012, 1, 1)  # default "keep" is 10 years
        t4 = datetime.datetime.fromtimestamp(
            t3.timestamp() + elSec2)  # see explanation above
1312 1313 1314
        logger.info("Setting up TICKS to start from: {}".format(t4))
        return t4

S
Shuduo Sang 已提交
1315
    def pickAndAllocateTable(self):  # pick any table, and "use" it
S
Steven Li 已提交
1316 1317
        return self.tableNumQueue.pickAndAllocate()

1318 1319 1320 1321 1322
    def addTable(self):
        with self._lock:
            tIndex = self.tableNumQueue.push()
        return tIndex

1323 1324
    @classmethod
    def getFixedSuperTableName(cls):
1325
        return "fs_table"
1326

1327 1328 1329 1330
    @classmethod
    def getFixedSuperTable(cls):
        return TdSuperTable(cls.getFixedSuperTableName())

S
Shuduo Sang 已提交
1331
    def releaseTable(self, i):  # return the table back, so others can use it
S
Steven Li 已提交
1332 1333
        self.tableNumQueue.release(i)

1334
    def getNextTick(self):
S
Shuduo Sang 已提交
1335
        with self._lock:  # prevent duplicate tick
1336 1337
            if Dice.throw(20) == 0:  # 1 in 20 chance
                return self._lastTick + datetime.timedelta(0, -100) # Go back in time 100 seconds
S
Shuduo Sang 已提交
1338 1339 1340
            else:  # regular
                # add one second to it
                self._lastTick += datetime.timedelta(0, 1)
S
Steven Li 已提交
1341
                return self._lastTick
1342 1343

    def getNextInt(self):
1344 1345 1346
        with self._lock:
            self._lastInt += 1
            return self._lastInt
1347 1348

    def getNextBinary(self):
S
Shuduo Sang 已提交
1349 1350
        return "Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_{}".format(
            self.getNextInt())
1351 1352

    def getNextFloat(self):
1353 1354 1355
        ret = 0.9 + self.getNextInt()
        # print("Float obtained: {}".format(ret))
        return ret
S
Shuduo Sang 已提交
1356

S
Steven Li 已提交
1357
    def getTableNameToDelete(self):
S
Shuduo Sang 已提交
1358 1359
        tblNum = self.tableNumQueue.pop()  # TODO: race condition!
        if (not tblNum):  # maybe false
1360
            return False
S
Shuduo Sang 已提交
1361

S
Steven Li 已提交
1362 1363
        return "table_{}".format(tblNum)

1364
    def cleanUp(self):
S
Shuduo Sang 已提交
1365 1366
        self._dbConn.close()

1367

1368
class TaskExecutor():
1369
    class BoundedList:
S
Shuduo Sang 已提交
1370
        def __init__(self, size=10):
1371 1372
            self._size = size
            self._list = []
S
Steven Li 已提交
1373
            self._lock = threading.Lock()
1374

S
Shuduo Sang 已提交
1375
        def add(self, n: int):
S
Steven Li 已提交
1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401
            with self._lock:
                if not self._list:  # empty
                    self._list.append(n)
                    return
                # now we should insert
                nItems = len(self._list)
                insPos = 0
                for i in range(nItems):
                    insPos = i
                    if n <= self._list[i]:  # smaller than this item, time to insert
                        break  # found the insertion point
                    insPos += 1  # insert to the right

                if insPos == 0:  # except for the 1st item, # TODO: elimiate first item as gating item
                    return  # do nothing

                # print("Inserting at postion {}, value: {}".format(insPos, n))
                self._list.insert(insPos, n)  # insert

                newLen = len(self._list)
                if newLen <= self._size:
                    return  # do nothing
                elif newLen == (self._size + 1):
                    del self._list[0]  # remove the first item
                else:
                    raise RuntimeError("Corrupt Bounded List")
1402 1403 1404 1405 1406 1407

        def __str__(self):
            return repr(self._list)

    _boundedList = BoundedList()

1408 1409 1410
    def __init__(self, curStep):
        self._curStep = curStep

1411 1412 1413 1414
    @classmethod
    def getBoundedList(cls):
        return cls._boundedList

1415 1416 1417
    def getCurStep(self):
        return self._curStep

S
Shuduo Sang 已提交
1418
    def execute(self, task: Task, wt: WorkerThread):  # execute a task on a thread
1419
        task.execute(wt)
1420

1421 1422 1423 1424
    def recordDataMark(self, n: int):
        # print("[{}]".format(n), end="", flush=True)
        self._boundedList.add(n)

1425 1426
    # def logInfo(self, msg):
    #     logger.info("    T[{}.x]: ".format(self._curStep) + msg)
1427

1428 1429
    # def logDebug(self, msg):
    #     logger.debug("    T[{}.x]: ".format(self._curStep) + msg)
1430

S
Shuduo Sang 已提交
1431

S
Steven Li 已提交
1432
class Task():
1433 1434 1435 1436
    taskSn = 100

    @classmethod
    def allocTaskNum(cls):
S
Shuduo Sang 已提交
1437
        Task.taskSn += 1  # IMPORTANT: cannot use cls.taskSn, since each sub class will have a copy
S
Steven Li 已提交
1438 1439
        # logger.debug("Allocating taskSN: {}".format(Task.taskSn))
        return Task.taskSn
1440

S
Shuduo Sang 已提交
1441
    def __init__(self, dbManager: DbManager, execStats: ExecutionStats):
1442
        self._dbManager = dbManager
S
Shuduo Sang 已提交
1443
        self._workerThread = None
1444
        self._err = None
1445
        self._aborted = False
1446
        self._curStep = None
S
Shuduo Sang 已提交
1447
        self._numRows = None  # Number of rows affected
1448

S
Shuduo Sang 已提交
1449
        # Assign an incremental task serial number
1450
        self._taskNum = self.allocTaskNum()
S
Steven Li 已提交
1451
        # logger.debug("Creating new task {}...".format(self._taskNum))
1452

1453 1454
        self._execStats = execStats

1455
    def isSuccess(self):
S
Shuduo Sang 已提交
1456
        return self._err is None
1457

1458 1459 1460
    def isAborted(self):
        return self._aborted

S
Shuduo Sang 已提交
1461
    def clone(self):  # TODO: why do we need this again?
1462
        newTask = self.__class__(self._dbManager, self._execStats)
1463 1464 1465
        return newTask

    def logDebug(self, msg):
S
Shuduo Sang 已提交
1466 1467 1468
        self._workerThread.logDebug(
            "Step[{}.{}] {}".format(
                self._curStep, self._taskNum, msg))
1469 1470

    def logInfo(self, msg):
S
Shuduo Sang 已提交
1471 1472 1473
        self._workerThread.logInfo(
            "Step[{}.{}] {}".format(
                self._curStep, self._taskNum, msg))
1474

1475
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
S
Shuduo Sang 已提交
1476 1477 1478
        raise RuntimeError(
            "To be implemeted by child classes, class name: {}".format(
                self.__class__.__name__))
1479

1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496
    def _isErrAcceptable(self, errno, msg):
        if errno in [
                0x05,  # TSDB_CODE_RPC_NOT_READY
                # 0x200, # invalid SQL, TODO: re-examine with TD-934
                0x360, 0x362, 
                0x369, # tag already exists
                0x36A, 0x36B, 0x36D,
                0x381, 
                0x380, # "db not selected"
                0x383,
                0x386,  # DB is being dropped?!
                0x503,
                0x510,  # vnode not in ready state
                0x600,
                1000  # REST catch-all error
            ]: 
            return True # These are the ALWAYS-ACCEPTABLE ones
1497 1498
        elif (errno in [ 0x0B ]) and gConfig.auto_start_service:
            return True # We may get "network unavilable" when restarting service
1499 1500 1501
        elif errno == 0x200 : # invalid SQL, we need to div in a bit more
            if msg.find("invalid column name") != -1:
                return True 
1502 1503 1504 1505
            elif msg.find("tags number not matched") != -1: # mismatched tags after modification
                return True
            elif msg.find("duplicated column names") != -1: # also alter table tag issues
                return True
S
Steven Li 已提交
1506 1507 1508
        elif (gSvcMgr!=None) and gSvcMgr.isRestarting():
            logger.info("Ignoring error when service is restarting: errno = {}, msg = {}".format(errno, msg))
            return True
1509 1510 1511 1512
        
        return False # Not an acceptable error


1513 1514
    def execute(self, wt: WorkerThread):
        wt.verifyThreadSelf()
S
Shuduo Sang 已提交
1515
        self._workerThread = wt  # type: ignore
1516 1517

        te = wt.getTaskExecutor()
1518
        self._curStep = te.getCurStep()
S
Shuduo Sang 已提交
1519 1520
        self.logDebug(
            "[-] executing task {}...".format(self.__class__.__name__))
1521 1522

        self._err = None
1523 1524
        self._execStats.beginTaskType(self.__class__.__name__)  # mark beginning
        errno2 = None
1525
        try:
S
Shuduo Sang 已提交
1526
            self._executeInternal(te, wt)  # TODO: no return value?
1527
        except taos.error.ProgrammingError as err:
1528
            errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno  # correct error scheme
1529
            if (gConfig.continue_on_exception):  # user choose to continue
1530
                self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format(
1531
                        errno2, err, wt.getDbConn().getLastSql()))
1532
                self._err = err
1533 1534
            elif self._isErrAcceptable(errno2, err.__str__()):
                self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
1535
                        errno2, err, wt.getDbConn().getLastSql()))
1536
                print("_", end="", flush=True)
S
Shuduo Sang 已提交
1537
                self._err = err
1538
            else: # not an acceptable error
1539 1540 1541
                errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, msg: {}, SQL: {}".format(
                    self.__class__.__name__,
                    errno2, err, wt.getDbConn().getLastSql())
1542
                self.logDebug(errMsg)
S
Shuduo Sang 已提交
1543
                if gConfig.debug:
1544 1545
                    # raise # so that we see full stack
                    traceback.print_exc()
1546 1547
                print(
                    "\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) +
1548 1549 1550 1551
                    "----------------------------\n")
                # sys.exit(-1)
                self._err = err
                self._aborted = True
S
Shuduo Sang 已提交
1552
        except Exception as e:
S
Steven Li 已提交
1553
            self.logInfo("Non-TAOS exception encountered")
S
Shuduo Sang 已提交
1554
            self._err = e
S
Steven Li 已提交
1555
            self._aborted = True
1556
            traceback.print_exc()
1557
        except BaseException as e:
1558
            self.logInfo("Python base exception encountered")
1559
            self._err = e
1560
            self._aborted = True
S
Steven Li 已提交
1561
            traceback.print_exc()
S
Shuduo Sang 已提交
1562 1563 1564
        except BaseException:
            self.logDebug(
                "[=] Unexpected exception, SQL: {}".format(
1565
                    wt.getDbConn().getLastSql()))
1566
            raise
1567
        self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
S
Shuduo Sang 已提交
1568 1569 1570 1571

        self.logDebug("[X] task execution completed, {}, status: {}".format(
            self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))
        # TODO: merge with above.
1572
        self._execStats.incExecCount(self.__class__.__name__, self.isSuccess(), errno2)
S
Steven Li 已提交
1573

1574
    def execSql(self, sql):
1575
        return self._dbManager.execute(sql)
1576

S
Shuduo Sang 已提交
1577
    def execWtSql(self, wt: WorkerThread, sql):  # execute an SQL on the worker thread
1578 1579
        return wt.execSql(sql)

S
Shuduo Sang 已提交
1580
    def queryWtSql(self, wt: WorkerThread, sql):  # execute an SQL on the worker thread
1581 1582
        return wt.querySql(sql)

S
Shuduo Sang 已提交
1583
    def getQueryResult(self, wt: WorkerThread):  # execute an SQL on the worker thread
1584 1585 1586
        return wt.getQueryResult()


1587
class ExecutionStats:
1588
    def __init__(self):
S
Shuduo Sang 已提交
1589 1590
        # total/success times for a task
        self._execTimes: Dict[str, [int, int]] = {}
1591 1592 1593
        self._tasksInProgress = 0
        self._lock = threading.Lock()
        self._firstTaskStartTime = None
1594
        self._execStartTime = None
1595
        self._errors = {}
S
Shuduo Sang 已提交
1596 1597
        self._elapsedTime = 0.0  # total elapsed time
        self._accRunTime = 0.0  # accumulated run time
1598

1599 1600 1601
        self._failed = False
        self._failureReason = None

S
Steven Li 已提交
1602
    def __str__(self):
S
Shuduo Sang 已提交
1603 1604
        return "[ExecStats: _failed={}, _failureReason={}".format(
            self._failed, self._failureReason)
S
Steven Li 已提交
1605 1606

    def isFailed(self):
S
Shuduo Sang 已提交
1607
        return self._failed
S
Steven Li 已提交
1608

1609 1610 1611 1612 1613 1614
    def startExec(self):
        self._execStartTime = time.time()

    def endExec(self):
        self._elapsedTime = time.time() - self._execStartTime

1615
    def incExecCount(self, klassName, isSuccess, eno=None):  # TODO: add a lock here
1616 1617
        if klassName not in self._execTimes:
            self._execTimes[klassName] = [0, 0]
S
Shuduo Sang 已提交
1618 1619
        t = self._execTimes[klassName]  # tuple for the data
        t[0] += 1  # index 0 has the "total" execution times
1620
        if isSuccess:
S
Shuduo Sang 已提交
1621
            t[1] += 1  # index 1 has the "success" execution times
1622 1623 1624 1625 1626
        if eno != None:             
            if klassName not in self._errors:
                self._errors[klassName] = {}
            errors = self._errors[klassName]
            errors[eno] = errors[eno]+1 if eno in errors else 1
1627 1628 1629

    def beginTaskType(self, klassName):
        with self._lock:
S
Shuduo Sang 已提交
1630 1631
            if self._tasksInProgress == 0:  # starting a new round
                self._firstTaskStartTime = time.time()  # I am now the first task
1632 1633 1634 1635 1636
            self._tasksInProgress += 1

    def endTaskType(self, klassName, isSuccess):
        with self._lock:
            self._tasksInProgress -= 1
S
Shuduo Sang 已提交
1637
            if self._tasksInProgress == 0:  # all tasks have stopped
1638 1639 1640
                self._accRunTime += (time.time() - self._firstTaskStartTime)
                self._firstTaskStartTime = None

1641 1642 1643 1644
    def registerFailure(self, reason):
        self._failed = True
        self._failureReason = reason

1645
    def printStats(self):
S
Shuduo Sang 已提交
1646 1647 1648 1649 1650 1651
        logger.info(
            "----------------------------------------------------------------------")
        logger.info(
            "| Crash_Gen test {}, with the following stats:". format(
                "FAILED (reason: {})".format(
                    self._failureReason) if self._failed else "SUCCEEDED"))
1652
        logger.info("| Task Execution Times (success/total):")
1653
        execTimesAny = 0
S
Shuduo Sang 已提交
1654
        for k, n in self._execTimes.items():
1655
            execTimesAny += n[0]
1656 1657 1658 1659 1660 1661 1662 1663
            errStr = None
            if k in self._errors:
                errors = self._errors[k]
                # print("errors = {}".format(errors))
                errStrs = ["0x{:X}:{}".format(eno, n) for (eno, n) in errors.items()]
                # print("error strings = {}".format(errStrs))
                errStr = ", ".join(errStrs) 
            logger.info("|    {0:<24}: {1}/{2} (Errors: {3})".format(k, n[1], n[0], errStr))
S
Shuduo Sang 已提交
1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682

        logger.info(
            "| Total Tasks Executed (success or not): {} ".format(execTimesAny))
        logger.info(
            "| Total Tasks In Progress at End: {}".format(
                self._tasksInProgress))
        logger.info(
            "| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(
                self._accRunTime))
        logger.info(
            "| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime / execTimesAny))
        logger.info(
            "| Total Elapsed Time (from wall clock): {:.3f} seconds".format(
                self._elapsedTime))
        logger.info(
            "| Top numbers written: {}".format(
                TaskExecutor.getBoundedList()))
        logger.info(
            "----------------------------------------------------------------------")
1683 1684 1685


class StateTransitionTask(Task):
1686 1687 1688 1689 1690
    LARGE_NUMBER_OF_TABLES = 35
    SMALL_NUMBER_OF_TABLES = 3
    LARGE_NUMBER_OF_RECORDS = 50
    SMALL_NUMBER_OF_RECORDS = 3

1691
    @classmethod
S
Shuduo Sang 已提交
1692
    def getInfo(cls):  # each sub class should supply their own information
1693 1694
        raise RuntimeError("Overriding method expected")

S
Shuduo Sang 已提交
1695
    _endState = None
1696
    @classmethod
S
Shuduo Sang 已提交
1697
    def getEndState(cls):  # TODO: optimize by calling it fewer times
1698 1699
        raise RuntimeError("Overriding method expected")

1700 1701 1702
    # @classmethod
    # def getBeginStates(cls):
    #     return cls.getInfo()[0]
1703

1704 1705 1706
    # @classmethod
    # def getEndState(cls): # returning the class name
    #     return cls.getInfo()[0]
1707 1708

    @classmethod
1709 1710 1711
    def canBeginFrom(cls, state: AnyState):
        # return state.getValue() in cls.getBeginStates()
        raise RuntimeError("must be overriden")
1712

1713 1714
    @classmethod
    def getRegTableName(cls, i):
1715
        return "reg_table_{}".format(i)
1716

1717 1718
    def execute(self, wt: WorkerThread):
        super().execute(wt)
S
Shuduo Sang 已提交
1719 1720


1721
class TaskCreateDb(StateTransitionTask):
1722
    @classmethod
1723
    def getEndState(cls):
S
Shuduo Sang 已提交
1724
        return StateDbOnly()
1725

1726 1727 1728 1729
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canCreateDb()

1730
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
S
Shuduo Sang 已提交
1731 1732
        self.execWtSql(wt, "create database db")

1733

1734
class TaskDropDb(StateTransitionTask):
1735
    @classmethod
1736 1737
    def getEndState(cls):
        return StateEmpty()
1738

1739 1740 1741 1742
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canDropDb()

1743
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1744
        self.execWtSql(wt, "drop database db")
S
Steven Li 已提交
1745
        logger.debug("[OPS] database dropped at {}".format(time.time()))
1746

S
Shuduo Sang 已提交
1747

1748
class TaskCreateSuperTable(StateTransitionTask):
1749
    @classmethod
1750 1751
    def getEndState(cls):
        return StateSuperTableOnly()
1752

1753 1754
    @classmethod
    def canBeginFrom(cls, state: AnyState):
1755
        return state.canCreateFixedSuperTable()
1756

1757
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
S
Shuduo Sang 已提交
1758
        if not wt.dbInUse():  # no DB yet, to the best of our knowledge
1759 1760 1761
            logger.debug("Skipping task, no DB yet")
            return

1762
        sTable = self._dbManager.getFixedSuperTable()
1763
        # wt.execSql("use db")    # should always be in place
1764 1765
        sTable.create(wt.getDbConn(), {'ts':'timestamp', 'speed':'int'}, {'b':'binary(200)', 'f':'float'})
        # self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
S
Shuduo Sang 已提交
1766 1767
        # No need to create the regular tables, INSERT will do that
        # automatically
1768

S
Steven Li 已提交
1769

1770 1771 1772 1773
class TdSuperTable:
    def __init__(self, stName):
        self._stName = stName

1774 1775 1776 1777 1778 1779 1780 1781
    def create(self, dbc, cols: dict, tags: dict):
        sql = "CREATE TABLE db.{} ({}) TAGS ({})".format(
            self._stName,
            ",".join(['%s %s'%(k,v) for (k,v) in cols.items()]),
            ",".join(['%s %s'%(k,v) for (k,v) in tags.items()])
            )
        dbc.execute(sql)        

1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799
    def getRegTables(self, dbc: DbConn):
        try:
            dbc.query("select TBNAME from db.{}".format(self._stName))  # TODO: analyze result set later            
        except taos.error.ProgrammingError as err:                    
            errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno
            logger.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err))
            raise

        qr = dbc.getQueryResult()
        return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation

    def hasRegTables(self, dbc: DbConn):
        return dbc.query("SELECT * FROM db.{}".format(self._stName)) > 0

    def ensureTable(self, dbc: DbConn, regTableName: str):
        sql = "select tbname from {} where tbname in ('{}')".format(self._stName, regTableName)
        if dbc.query(sql) >= 1 : # reg table exists already
            return
1800 1801
        sql = "CREATE TABLE {} USING {} tags ({})".format(
            regTableName, self._stName, self._getTagStrForSql(dbc)
1802 1803 1804
        )
        dbc.execute(sql)

1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849
    def _getTagStrForSql(self, dbc) :
        tags = self._getTags(dbc)
        tagStrs = []
        for tagName in tags: 
            tagType = tags[tagName]
            if tagType == 'BINARY':
                tagStrs.append("'Beijing-Shanghai-LosAngeles'")
            elif tagType == 'FLOAT':
                tagStrs.append('9.9')
            elif tagType == 'INT':
                tagStrs.append('88')
            else:
                raise RuntimeError("Unexpected tag type: {}".format(tagType))
        return ", ".join(tagStrs)

    def _getTags(self, dbc) -> dict:
        dbc.query("DESCRIBE {}".format(self._stName))
        stCols = dbc.getQueryResult()
        # print(stCols)
        ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type
        # print("Tags retrieved: {}".format(ret))
        return ret

    def addTag(self, dbc, tagName, tagType):
        if tagName in self._getTags(dbc): # already 
            return
        # sTable.addTag("extraTag", "int")
        sql = "alter table db.{} add tag {} {}".format(self._stName, tagName, tagType)
        dbc.execute(sql)

    def dropTag(self, dbc, tagName):
        if not tagName in self._getTags(dbc): # don't have this tag
            return
        sql = "alter table db.{} drop tag {}".format(self._stName, tagName)
        dbc.execute(sql)

    def changeTag(self, dbc, oldTag, newTag):
        tags = self._getTags(dbc)
        if not oldTag in tags: # don't have this tag
            return
        if newTag in tags: # already have this tag
            return
        sql = "alter table db.{} change tag {} {}".format(self._stName, oldTag, newTag)
        dbc.execute(sql)

1850
class TaskReadData(StateTransitionTask):
1851
    @classmethod
1852
    def getEndState(cls):
S
Shuduo Sang 已提交
1853
        return None  # meaning doesn't affect state
1854

1855 1856 1857 1858
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canReadData()

1859
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1860
        sTable = self._dbManager.getFixedSuperTable()
1861

S
Shuduo Sang 已提交
1862 1863
        if random.randrange(
                5) == 0:  # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations
1864 1865
            wt.getDbConn().close()
            wt.getDbConn().open()
1866
        
1867
        for rTbName in sTable.getRegTables(wt.getDbConn()):  # regular tables
1868 1869 1870 1871 1872 1873 1874 1875
            aggExpr = Dice.choice(['*', 'count(*)', 'avg(speed)', 
                # 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
                'sum(speed)', 'stddev(speed)', 
                'min(speed)', 'max(speed)', 'first(speed)', 'last(speed)']) # TODO: add more from 'top'
            try:
                self.execWtSql(wt, "select {} from db.{}".format(aggExpr, rTbName))
            except taos.error.ProgrammingError as err:                    
                errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno
1876
                logger.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, wt.getDbConn().getLastSql()))
1877
                raise
S
Shuduo Sang 已提交
1878

1879
class TaskDropSuperTable(StateTransitionTask):
1880
    @classmethod
1881
    def getEndState(cls):
S
Shuduo Sang 已提交
1882
        return StateDbOnly()
1883

1884 1885
    @classmethod
    def canBeginFrom(cls, state: AnyState):
1886
        return state.canDropFixedSuperTable()
1887

1888
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
S
Shuduo Sang 已提交
1889 1890 1891 1892 1893 1894 1895
        # 1/2 chance, we'll drop the regular tables one by one, in a randomized
        # sequence
        if Dice.throw(2) == 0:
            tblSeq = list(range(
                2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)))
            random.shuffle(tblSeq)
            tickOutput = False  # if we have spitted out a "d" character for "drop regular table"
1896
            isSuccess = True
S
Shuduo Sang 已提交
1897 1898 1899
            for i in tblSeq:
                regTableName = self.getRegTableName(
                    i)  # "db.reg_table_{}".format(i)
1900
                try:
S
Shuduo Sang 已提交
1901 1902 1903 1904 1905 1906 1907
                    self.execWtSql(wt, "drop table {}".format(
                        regTableName))  # nRows always 0, like MySQL
                except taos.error.ProgrammingError as err:
                    # correcting for strange error number scheme
                    errno2 = err.errno if (
                        err.errno > 0) else 0x80000000 + err.errno
                    if (errno2 in [0x362]):  # mnode invalid table name
1908
                        isSuccess = False
S
Shuduo Sang 已提交
1909 1910 1911
                        logger.debug(
                            "[DB] Acceptable error when dropping a table")
                    continue  # try to delete next regular table
1912 1913

                if (not tickOutput):
S
Shuduo Sang 已提交
1914 1915
                    tickOutput = True  # Print only one time
                    if isSuccess:
1916 1917
                        print("d", end="", flush=True)
                    else:
S
Shuduo Sang 已提交
1918
                        print("f", end="", flush=True)
1919 1920

        # Drop the super table itself
S
Shuduo Sang 已提交
1921
        tblName = self._dbManager.getFixedSuperTableName()
1922
        self.execWtSql(wt, "drop table db.{}".format(tblName))
1923

S
Shuduo Sang 已提交
1924

1925 1926 1927
class TaskAlterTags(StateTransitionTask):
    @classmethod
    def getEndState(cls):
S
Shuduo Sang 已提交
1928
        return None  # meaning doesn't affect state
1929 1930 1931

    @classmethod
    def canBeginFrom(cls, state: AnyState):
S
Shuduo Sang 已提交
1932
        return state.canDropFixedSuperTable()  # if we can drop it, we can alter tags
1933 1934

    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1935 1936 1937
        # tblName = self._dbManager.getFixedSuperTableName()
        dbc = wt.getDbConn()
        sTable = self._dbManager.getFixedSuperTable()
1938
        dice = Dice.throw(4)
S
Shuduo Sang 已提交
1939
        if dice == 0:
1940 1941
            sTable.addTag(dbc, "extraTag", "int")
            # sql = "alter table db.{} add tag extraTag int".format(tblName)
S
Shuduo Sang 已提交
1942
        elif dice == 1:
1943 1944
            sTable.dropTag(dbc, "extraTag")
            # sql = "alter table db.{} drop tag extraTag".format(tblName)
S
Shuduo Sang 已提交
1945
        elif dice == 2:
1946 1947
            sTable.dropTag(dbc, "newTag")
            # sql = "alter table db.{} drop tag newTag".format(tblName)
S
Shuduo Sang 已提交
1948
        else:  # dice == 3
1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965
            sTable.changeTag(dbc, "extraTag", "newTag")
            # sql = "alter table db.{} change tag extraTag newTag".format(tblName)

class TaskRestartService(StateTransitionTask):
    _isRunning = False
    _classLock = threading.Lock()

    @classmethod
    def getEndState(cls):
        return None  # meaning doesn't affect state

    @classmethod
    def canBeginFrom(cls, state: AnyState):
        if gConfig.auto_start_service:
            return state.canDropFixedSuperTable()  # Basicallly when we have the super table
        return False # don't run this otherwise

1966
    CHANCE_TO_RESTART_SERVICE = 100
1967 1968 1969 1970
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        if not gConfig.auto_start_service: # only execute when we are in -a mode
            print("_a", end="", flush=True)
            return
1971

1972 1973 1974 1975 1976 1977
        with self._classLock:
            if self._isRunning:
                print("Skipping restart task, another running already")
                return
            self._isRunning = True

1978
        if Dice.throw(self.CHANCE_TO_RESTART_SERVICE) == 0: # 1 in N chance
1979 1980 1981
            dbc = wt.getDbConn()
            dbc.execute("show databases") # simple delay, align timing with other workers
            gSvcMgr.restart()
1982

1983
        self._isRunning = False
S
Shuduo Sang 已提交
1984

1985
class TaskAddData(StateTransitionTask):
S
Shuduo Sang 已提交
1986 1987
    # Track which table is being actively worked on
    activeTable: Set[int] = set()
1988

S
Shuduo Sang 已提交
1989 1990
    # We use these two files to record operations to DB, useful for power-off
    # tests
1991 1992 1993 1994 1995
    fAddLogReady = None
    fAddLogDone = None

    @classmethod
    def prepToRecordOps(cls):
S
Shuduo Sang 已提交
1996 1997 1998 1999
        if gConfig.record_ops:
            if (cls.fAddLogReady is None):
                logger.info(
                    "Recording in a file operations to be performed...")
2000
                cls.fAddLogReady = open("add_log_ready.txt", "w")
S
Shuduo Sang 已提交
2001
            if (cls.fAddLogDone is None):
2002 2003
                logger.info("Recording in a file operations completed...")
                cls.fAddLogDone = open("add_log_done.txt", "w")
2004

2005
    @classmethod
2006 2007
    def getEndState(cls):
        return StateHasData()
2008 2009 2010 2011

    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canAddData()
S
Shuduo Sang 已提交
2012

2013
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
2014
        ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access
2015
        tblSeq = list(range(
S
Shuduo Sang 已提交
2016 2017 2018 2019
                self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))
        random.shuffle(tblSeq)
        for i in tblSeq:
            if (i in self.activeTable):  # wow already active
2020
                print("x", end="", flush=True) # concurrent insertion
2021
            else:
S
Shuduo Sang 已提交
2022
                self.activeTable.add(i)  # marking it active
2023 2024 2025
            
            sTable = ds.getFixedSuperTable()
            regTableName = self.getRegTableName(i)  # "db.reg_table_{}".format(i)
2026
            sTable.ensureTable(wt.getDbConn(), regTableName)  # Ensure the table exists           
2027 2028
           
            for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS):  # number of records per table
S
Shuduo Sang 已提交
2029
                nextInt = ds.getNextInt()
2030 2031
                if gConfig.record_ops:
                    self.prepToRecordOps()
2032
                    self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
2033 2034
                    self.fAddLogReady.flush()
                    os.fsync(self.fAddLogReady)
2035
                sql = "insert into {} values ('{}', {});".format( # removed: tags ('{}', {})
S
Shuduo Sang 已提交
2036
                    regTableName,
2037 2038
                    # ds.getFixedSuperTableName(),
                    # ds.getNextBinary(), ds.getNextFloat(),
2039
                    ds.getNextTick(), nextInt)
S
Shuduo Sang 已提交
2040 2041 2042
                self.execWtSql(wt, sql)
                # Successfully wrote the data into the DB, let's record it
                # somehow
2043
                te.recordDataMark(nextInt)
2044
                if gConfig.record_ops:
S
Shuduo Sang 已提交
2045 2046 2047
                    self.fAddLogDone.write(
                        "Wrote {} to {}\n".format(
                            nextInt, regTableName))
2048 2049
                    self.fAddLogDone.flush()
                    os.fsync(self.fAddLogDone)
S
Shuduo Sang 已提交
2050
            self.activeTable.discard(i)  # not raising an error, unlike remove
2051 2052


S
Steven Li 已提交
2053 2054
# Deterministic random number generator
class Dice():
S
Shuduo Sang 已提交
2055
    seeded = False  # static, uninitialized
S
Steven Li 已提交
2056 2057

    @classmethod
S
Shuduo Sang 已提交
2058
    def seed(cls, s):  # static
S
Steven Li 已提交
2059
        if (cls.seeded):
S
Shuduo Sang 已提交
2060 2061
            raise RuntimeError(
                "Cannot seed the random generator more than once")
S
Steven Li 已提交
2062 2063 2064 2065 2066
        cls.verifyRNG()
        random.seed(s)
        cls.seeded = True  # TODO: protect against multi-threading

    @classmethod
S
Shuduo Sang 已提交
2067
    def verifyRNG(cls):  # Verify that the RNG is determinstic
S
Steven Li 已提交
2068 2069 2070 2071
        random.seed(0)
        x1 = random.randrange(0, 1000)
        x2 = random.randrange(0, 1000)
        x3 = random.randrange(0, 1000)
S
Shuduo Sang 已提交
2072
        if (x1 != 864 or x2 != 394 or x3 != 776):
S
Steven Li 已提交
2073 2074 2075
            raise RuntimeError("System RNG is not deterministic")

    @classmethod
S
Shuduo Sang 已提交
2076
    def throw(cls, stop):  # get 0 to stop-1
2077
        return cls.throwRange(0, stop)
S
Steven Li 已提交
2078 2079

    @classmethod
S
Shuduo Sang 已提交
2080 2081
    def throwRange(cls, start, stop):  # up to stop-1
        if (not cls.seeded):
S
Steven Li 已提交
2082
            raise RuntimeError("Cannot throw dice before seeding it")
2083
        return random.randrange(start, stop)
S
Steven Li 已提交
2084

2085 2086 2087 2088
    @classmethod
    def choice(cls, cList):
        return random.choice(cList)

S
Steven Li 已提交
2089

S
Steven Li 已提交
2090 2091
class LoggingFilter(logging.Filter):
    def filter(self, record: logging.LogRecord):
S
Shuduo Sang 已提交
2092 2093
        if (record.levelno >= logging.INFO):
            return True  # info or above always log
S
Steven Li 已提交
2094

S
Steven Li 已提交
2095 2096 2097 2098
        # Commenting out below to adjust...

        # if msg.startswith("[TRD]"):
        #     return False
S
Steven Li 已提交
2099 2100
        return True

S
Shuduo Sang 已提交
2101 2102

class MyLoggingAdapter(logging.LoggerAdapter):
2103 2104 2105 2106
    def process(self, msg, kwargs):
        return "[{}]{}".format(threading.get_ident() % 10000, msg), kwargs
        # return '[%s] %s' % (self.extra['connid'], msg), kwargs

S
Shuduo Sang 已提交
2107 2108

class SvcManager:
2109
    def __init__(self):
2110
        print("Starting TDengine Service Manager")
2111 2112 2113
        # signal.signal(signal.SIGTERM, self.sigIntHandler) # Moved to MainExec
        # signal.signal(signal.SIGINT, self.sigIntHandler)
        # signal.signal(signal.SIGUSR1, self.sigUsrHandler)  # different handler!
2114

2115
        self.inSigHandler = False
2116 2117
        # self._status = MainExec.STATUS_RUNNING # set inside
        # _startTaosService()
2118
        self.svcMgrThread = None
2119 2120
        self._lock = threading.Lock()
        self._isRestarting = False
2121

2122 2123 2124 2125 2126 2127 2128 2129
    def _doMenu(self):
        choice = ""
        while True:
            print("\nInterrupting Service Program, Choose an Action: ")
            print("1: Resume")
            print("2: Terminate")
            print("3: Restart")
            # Remember to update the if range below
S
Shuduo Sang 已提交
2130
            # print("Enter Choice: ", end="", flush=True)
2131 2132 2133
            while choice == "":
                choice = input("Enter Choice: ")
                if choice != "":
S
Shuduo Sang 已提交
2134 2135 2136
                    break  # done with reading repeated input
            if choice in ["1", "2", "3"]:
                break  # we are done with whole method
2137
            print("Invalid choice, please try again.")
S
Shuduo Sang 已提交
2138
            choice = ""  # reset
2139 2140
        return choice

S
Shuduo Sang 已提交
2141
    def sigUsrHandler(self, signalNumber, frame):
2142
        print("Interrupting main thread execution upon SIGUSR1")
2143
        if self.inSigHandler:  # already
2144
            print("Ignoring repeated SIG...")
S
Shuduo Sang 已提交
2145
            return  # do nothing if it's already not running
2146
        self.inSigHandler = True
2147 2148

        choice = self._doMenu()
S
Shuduo Sang 已提交
2149 2150 2151 2152 2153
        if choice == "1":
            # TODO: can the sub-process be blocked due to us not reading from
            # queue?
            self.sigHandlerResume()
        elif choice == "2":
2154
            self.stopTaosService()
2155 2156
        elif choice == "3": # Restart
            self.restart()
2157 2158
        else:
            raise RuntimeError("Invalid menu choice: {}".format(choice))
2159

2160 2161
        self.inSigHandler = False

2162
    def sigIntHandler(self, signalNumber, frame):
2163
        print("SvcManager: INT Signal Handler starting...")
2164
        if self.inSigHandler:
2165 2166
            print("Ignoring repeated SIG_INT...")
            return
2167
        self.inSigHandler = True
2168

S
Shuduo Sang 已提交
2169
        self.stopTaosService()
2170
        print("SvcManager: INT Signal Handler returning...")
2171
        self.inSigHandler = False
2172

S
Shuduo Sang 已提交
2173
    def sigHandlerResume(self):
2174
        print("Resuming TDengine service manager thread (main thread)...\n\n")
2175

2176
    def _checkServiceManagerThread(self):
2177 2178 2179 2180
        if self.svcMgrThread:  # valid svc mgr thread
            if self.svcMgrThread.isStopped():  # done?
                self.svcMgrThread.procIpcBatch()  # one last time. TODO: appropriate?
                self.svcMgrThread = None  # no more
2181 2182

    def _procIpcAll(self):
2183 2184 2185 2186 2187 2188
        while self.isRunning() or self.isRestarting() :  # for as long as the svc mgr thread is still here
            if self.isRunning():
                self.svcMgrThread.procIpcBatch()  # regular processing,
                self._checkServiceManagerThread()
            elif self.isRetarting():
                print("Service restarting...")
2189 2190 2191 2192 2193
            time.sleep(0.5)  # pause, before next round
        print(
            "Service Manager Thread (with subprocess) has ended, main thread now exiting...")

    def startTaosService(self):
2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207
        with self._lock:
            if self.svcMgrThread:
                raise RuntimeError("Cannot start TAOS service when one may already be running")

            # Find if there's already a taosd service, and then kill it
            for proc in psutil.process_iter():
                if proc.name() == 'taosd':
                    print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupe")
                    time.sleep(2.0)
                    proc.kill()
                # print("Process: {}".format(proc.name()))
            
            self.svcMgrThread = ServiceManagerThread()  # create the object
            self.svcMgrThread.start()
S
Steven Li 已提交
2208
            print("Attempting to start TAOS service started, printing out output...")
2209 2210 2211 2212
            self.svcMgrThread.procIpcBatch(
                trimToTarget=10,
                forceOutput=True)  # for printing 10 lines
            print("TAOS service started")
2213 2214

    def stopTaosService(self, outputLines=20):
2215 2216 2217 2218
        with self._lock:
            if not self.isRunning():
                logger.warning("Cannot stop TAOS service, not running")
                return
2219

2220 2221 2222 2223 2224 2225 2226 2227 2228
            print("Terminating Service Manager Thread (SMT) execution...")
            self.svcMgrThread.stop()
            if self.svcMgrThread.isStopped():
                self.svcMgrThread.procIpcBatch(outputLines)  # one last time
                self.svcMgrThread = None
                print("----- End of TDengine Service Output -----\n")
                print("SMT execution terminated")
            else:
                print("WARNING: SMT did not terminate as expected")
2229 2230 2231

    def run(self):
        self.startTaosService()
2232
        self._procIpcAll()  # pump/process all the messages, may encounter SIG + restart
2233
        if self.isRunning():  # if sig handler hasn't destroyed it by now
2234 2235
            self.stopTaosService()  # should have started already

2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249
    def restart(self):
        if self._isRestarting:
            logger.warning("Cannot restart service when it's already restarting")
            return

        self._isRestarting = True
        if self.isRunning():
            self.stopTaosService()
        else:
            logger.warning("Service not running when restart requested")

        self.startTaosService()
        self._isRestarting = False

2250 2251
    def isRunning(self):
        return self.svcMgrThread != None
2252

2253 2254 2255
    def isRestarting(self):
        return self._isRestarting

2256 2257 2258 2259 2260
class ServiceManagerThread:
    MAX_QUEUE_SIZE = 10000

    def __init__(self):
        self._tdeSubProcess = None
2261
        self._thread = None
2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278
        self._status = None

    def getStatus(self):
        return self._status

    def isRunning(self):
        # return self._thread and self._thread.is_alive()
        return self._status == MainExec.STATUS_RUNNING

    def isStopping(self):
        return self._status == MainExec.STATUS_STOPPING

    def isStopped(self):
        return self._status == MainExec.STATUS_STOPPED

    # Start the thread (with sub process), and wait for the sub service
    # to become fully operational
2279 2280
    def start(self):
        if self._thread:
2281
            raise RuntimeError("Unexpected _thread")
2282
        if self._tdeSubProcess:
2283 2284 2285 2286
            raise RuntimeError("TDengine sub process already created/running")

        self._status = MainExec.STATUS_STARTING

2287
        self._tdeSubProcess = TdeSubProcess()
2288 2289 2290 2291
        self._tdeSubProcess.start()

        self._ipcQueue = Queue()
        self._thread = threading.Thread(
2292
            target=self.svcOutputReader,
2293
            args=(self._tdeSubProcess.getStdOut(), self._ipcQueue))
2294
        self._thread.daemon = True  # thread dies with the program
2295 2296
        self._thread.start()

2297 2298 2299 2300 2301 2302
        self._thread2 = threading.Thread(
            target=self.svcErrorReader,
            args=(self._tdeSubProcess.getStdErr(), self._ipcQueue))
        self._thread2.daemon = True  # thread dies with the program
        self._thread2.start()

2303
        # wait for service to start
2304
        for i in range(0, 10):
2305 2306 2307
            time.sleep(1.0)
            # self.procIpcBatch() # don't pump message during start up
            print("_zz_", end="", flush=True)
2308
            if self._status == MainExec.STATUS_RUNNING:
2309
                logger.info("[] TDengine service READY to process requests")
2310 2311
                return  # now we've started
        # TODO: handle this better?
2312
        self.procIpcBatch(20, True) # display output before cronking out, trim to last 20 msgs, force output
2313
        raise RuntimeError("TDengine service did not start successfully")
2314 2315 2316 2317 2318 2319

    def stop(self):
        # can be called from both main thread or signal handler
        print("Terminating TDengine service running as the sub process...")
        if self.isStopped():
            print("Service already stopped")
2320
            return
2321 2322 2323
        if self.isStopping():
            print("Service is already being stopped")
            return
2324 2325 2326 2327
        # Linux will send Control-C generated SIGINT to the TDengine process
        # already, ref:
        # https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
        if not self._tdeSubProcess:
2328
            raise RuntimeError("sub process object missing")
2329

2330
        self._status = MainExec.STATUS_STOPPING
2331 2332
        retCode = self._tdeSubProcess.stop()
        print("Attempted to stop sub process, got return code: {}".format(retCode))
2333

2334
        if self._tdeSubProcess.isRunning():  # still running
2335 2336
            print("FAILED to stop sub process, it is still running... pid = {}".format(
                    self._tdeSubProcess.getPid()))
2337
        else:
2338 2339 2340
            self._tdeSubProcess = None  # not running any more
            self.join()  # stop the thread, change the status, etc.

2341 2342 2343
    def join(self):
        # TODO: sanity check
        if not self.isStopping():
2344 2345 2346
            raise RuntimeError(
                "Unexpected status when ending svc mgr thread: {}".format(
                    self._status))
2347

2348
        if self._thread:
2349
            self._thread.join()
2350
            self._thread = None
2351
            self._status = MainExec.STATUS_STOPPED
2352 2353 2354
            # STD ERR thread
            self._thread2.join()
            self._thread2 = None
S
Shuduo Sang 已提交
2355
        else:
2356
            print("Joining empty thread, doing nothing")
2357 2358 2359

    def _trimQueue(self, targetSize):
        if targetSize <= 0:
2360
            return  # do nothing
2361
        q = self._ipcQueue
2362
        if (q.qsize() <= targetSize):  # no need to trim
2363 2364 2365 2366
            return

        logger.debug("Triming IPC queue to target size: {}".format(targetSize))
        itemsToTrim = q.qsize() - targetSize
2367
        for i in range(0, itemsToTrim):
2368 2369 2370
            try:
                q.get_nowait()
            except Empty:
2371 2372
                break  # break out of for loop, no more trimming

2373
    TD_READY_MSG = "TDengine is initialized successfully"
S
Shuduo Sang 已提交
2374

2375 2376
    def procIpcBatch(self, trimToTarget=0, forceOutput=False):
        self._trimQueue(trimToTarget)  # trim if necessary
S
Shuduo Sang 已提交
2377 2378
        # Process all the output generated by the underlying sub process,
        # managed by IO thread
2379
        print("<", end="", flush=True)
S
Shuduo Sang 已提交
2380 2381
        while True:
            try:
2382
                line = self._ipcQueue.get_nowait()  # getting output at fast speed
2383
                self._printProgress("_o")
2384 2385 2386
            except Empty:
                # time.sleep(2.3) # wait only if there's no output
                # no more output
2387
                print(".>", end="", flush=True)
S
Shuduo Sang 已提交
2388
                return  # we are done with THIS BATCH
2389
            else:  # got line, printing out
2390 2391 2392 2393 2394 2395 2396
                if forceOutput:
                    logger.info(line)
                else:
                    logger.debug(line)
        print(">", end="", flush=True)

    _ProgressBars = ["--", "//", "||", "\\\\"]
2397

2398 2399
    def _printProgress(self, msg):  # TODO: assuming 2 chars
        print(msg, end="", flush=True)
2400 2401 2402
        pBar = self._ProgressBars[Dice.throw(4)]
        print(pBar, end="", flush=True)
        print('\b\b\b\b', end="", flush=True)
2403

2404 2405 2406
    def svcOutputReader(self, out: IO, queue):
        # Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
        # print("This is the svcOutput Reader...")
2407
        # for line in out :
2408 2409 2410 2411
        for line in iter(out.readline, b''):
            # print("Finished reading a line: {}".format(line))
            # print("Adding item to queue...")
            line = line.decode("utf-8").rstrip()
2412 2413
            # This might block, and then causing "out" buffer to block
            queue.put(line)
2414 2415
            self._printProgress("_i")

2416 2417
            if self._status == MainExec.STATUS_STARTING:  # we are starting, let's see if we have started
                if line.find(self.TD_READY_MSG) != -1:  # found
S
Steven Li 已提交
2418 2419 2420 2421
                    logger.info("Waiting for the service to become FULLY READY")
                    time.sleep(1.0) # wait for the server to truly start. TODO: remove this
                    logger.info("Service is now FULLY READY")   
                    self._status = MainExec.STATUS_RUNNING                 
2422 2423

            # Trim the queue if necessary: TODO: try this 1 out of 10 times
2424
            self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10)  # trim to 90% size
2425

2426 2427 2428
            if self.isStopping():  # TODO: use thread status instead
                # WAITING for stopping sub process to finish its outptu
                print("_w", end="", flush=True)
2429 2430

            # queue.put(line)
2431 2432
        # meaning sub process must have died
        print("\nNo more output from IO thread managing TDengine service")
2433 2434
        out.close()

2435 2436 2437 2438
    def svcErrorReader(self, err: IO, queue):
        for line in iter(err.readline, b''):
            print("\nTD Svc STDERR: {}".format(line))

2439 2440

class TdeSubProcess:
2441 2442 2443 2444 2445
    def __init__(self):
        self.subProcess = None

    def getStdOut(self):
        return self.subProcess.stdout
2446

2447 2448 2449
    def getStdErr(self):
        return self.subProcess.stderr

2450
    def isRunning(self):
2451
        return self.subProcess is not None
2452

2453 2454 2455
    def getPid(self):
        return self.subProcess.pid

S
Shuduo Sang 已提交
2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469
    def getBuildPath(self):
        selfPath = os.path.dirname(os.path.realpath(__file__))
        if ("community" in selfPath):
            projPath = selfPath[:selfPath.find("communit")]
        else:
            projPath = selfPath[:selfPath.find("tests")]

        for root, dirs, files in os.walk(projPath):
            if ("taosd" in files):
                rootRealPath = os.path.dirname(os.path.realpath(root))
                if ("packaging" not in rootRealPath):
                    buildPath = root[:len(root) - len("/build/bin")]
                    break
        return buildPath
2470

2471
    def start(self):
2472
        ON_POSIX = 'posix' in sys.builtin_module_names
S
Shuduo Sang 已提交
2473

2474 2475 2476
        taosdPath = self.getBuildPath() + "/build/bin/taosd"
        cfgPath = self.getBuildPath() + "/test/cfg"

2477 2478 2479
        # Delete the log files
        logPath = self.getBuildPath() + "/test/log"
        # ref: https://stackoverflow.com/questions/1995373/deleting-all-files-in-a-directory-with-python/1995397
2480 2481 2482 2483
        # filelist = [ f for f in os.listdir(logPath) ] # if f.endswith(".bak") ]
        # for f in filelist:
        #     filePath = os.path.join(logPath, f)
        #     print("Removing log file: {}".format(filePath))
2484 2485 2486 2487 2488 2489
        #     os.remove(filePath)        
        if os.path.exists(logPath):
            logPathSaved = logPath + "_" + time.strftime('%Y-%m-%d-%H-%M-%S')
            logger.info("Saving old log files to: {}".format(logPathSaved))
            os.rename(logPath, logPathSaved)
        # os.mkdir(logPath) # recreate, no need actually, TDengine will auto-create with proper perms
2490
            
S
Shuduo Sang 已提交
2491
        svcCmd = [taosdPath, '-c', cfgPath]
2492
        # svcCmdSingle = "{} -c {}".format(taosdPath, cfgPath)
2493
        # svcCmd = ['vmstat', '1']
S
Shuduo Sang 已提交
2494
        if self.subProcess:  # already there
2495 2496
            raise RuntimeError("Corrupt process state")

S
Steven Li 已提交
2497
        # print("Starting service: {}".format(svcCmd))
2498
        self.subProcess = subprocess.Popen(
2499 2500
            svcCmd, shell=False,
            # svcCmdSingle, shell=True, # capture core dump?
S
Shuduo Sang 已提交
2501
            stdout=subprocess.PIPE,
2502
            stderr=subprocess.PIPE,
2503
            # bufsize=1, # not supported in binary mode
S
Steven Li 已提交
2504 2505
            close_fds=ON_POSIX
            )  # had text=True, which interferred with reading EOF
2506

2507 2508 2509
    def stop(self):
        if not self.subProcess:
            print("Sub process already stopped")
2510
            return -1
2511

2512
        retCode = self.subProcess.poll() # contains real sub process return code
S
Shuduo Sang 已提交
2513
        if retCode:  # valid return code, process ended
2514
            self.subProcess = None
S
Shuduo Sang 已提交
2515 2516
        else:  # process still alive, let's interrupt it
            print(
2517
                "Sub process is running, sending SIG_INT and waiting for it to terminate...")
S
Shuduo Sang 已提交
2518 2519 2520 2521
            # sub process should end, then IPC queue should end, causing IO
            # thread to end
            self.subProcess.send_signal(signal.SIGINT)
            try:
2522
                self.subProcess.wait(10)
2523
                retCode = self.subProcess.returncode
2524 2525
            except subprocess.TimeoutExpired as err:
                print("Time out waiting for TDengine service process to exit")
2526
                retCode = -3
2527
            else:
2528
                print("TDengine service process terminated successfully from SIG_INT")
2529
                retCode = -4
2530
                self.subProcess = None
2531
        return retCode
2532

2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560
class ThreadStacks: # stack info for all threads
    def __init__(self):
        self._allStacks = {}
        allFrames = sys._current_frames()
        for th in threading.enumerate():                        
            stack = traceback.extract_stack(allFrames[th.ident])     
            self._allStacks[th.native_id] = stack

    def print(self, filteredEndName = None, filterInternal = False):
        for thNid, stack in self._allStacks.items(): # for each thread            
            lastFrame = stack[-1]
            if filteredEndName: # we need to filter out stacks that match this name                
                if lastFrame.name == filteredEndName : # end did not match
                    continue
            if filterInternal:
                if lastFrame.name in ['wait', 'invoke_excepthook', 
                    '_wait', # The Barrier exception
                    'svcOutputReader', # the svcMgr thread
                    '__init__']: # the thread that extracted the stack
                    continue # ignore
            # Now print
            print("\n<----- Thread Info for ID: {}".format(thNid))
            for frame in stack:
                # print(frame)
                print("File {filename}, line {lineno}, in {name}".format(
                    filename=frame.filename, lineno=frame.lineno, name=frame.name))
                print("    {}".format(frame.line))
            print("-----> End of Thread Info\n")
S
Shuduo Sang 已提交
2561

2562 2563 2564
class ClientManager:
    def __init__(self):
        print("Starting service manager")
2565 2566
        # signal.signal(signal.SIGTERM, self.sigIntHandler)
        # signal.signal(signal.SIGINT, self.sigIntHandler)
2567

2568
        self._status = MainExec.STATUS_RUNNING
2569 2570
        self.tc = None

2571 2572
        self.inSigHandler = False

2573
    def sigIntHandler(self, signalNumber, frame):
2574
        if self._status != MainExec.STATUS_RUNNING:
2575 2576 2577
            print("Repeated SIGINT received, forced exit...")
            # return  # do nothing if it's already not running
            sys.exit(-1)
2578
        self._status = MainExec.STATUS_STOPPING  # immediately set our status
2579

2580
        print("ClientManager: Terminating program...")
2581 2582
        self.tc.requestToStop()

2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623
    def _doMenu(self):
        choice = ""
        while True:
            print("\nInterrupting Client Program, Choose an Action: ")
            print("1: Resume")
            print("2: Terminate")
            print("3: Show Threads")
            # Remember to update the if range below
            # print("Enter Choice: ", end="", flush=True)
            while choice == "":
                choice = input("Enter Choice: ")
                if choice != "":
                    break  # done with reading repeated input
            if choice in ["1", "2", "3"]:
                break  # we are done with whole method
            print("Invalid choice, please try again.")
            choice = ""  # reset
        return choice

    def sigUsrHandler(self, signalNumber, frame):
        print("Interrupting main thread execution upon SIGUSR1")
        if self.inSigHandler:  # already
            print("Ignoring repeated SIG_USR1...")
            return  # do nothing if it's already not running
        self.inSigHandler = True

        choice = self._doMenu()
        if choice == "1":
            print("Resuming execution...")
            time.sleep(1.0)
        elif choice == "2":
            print("Not implemented yet")
            time.sleep(1.0)
        elif choice == "3":
            ts = ThreadStacks()
            ts.print()
        else:
            raise RuntimeError("Invalid menu choice: {}".format(choice))

        self.inSigHandler = False

S
Shuduo Sang 已提交
2624
    def _printLastNumbers(self):  # to verify data durability
2625 2626
        dbManager = DbManager(resetDb=False)
        dbc = dbManager.getDbConn()
S
Shuduo Sang 已提交
2627
        if dbc.query("show databases") == 0:  # no databae
2628
            return
S
Shuduo Sang 已提交
2629
        if dbc.query("show tables") == 0:  # no tables
S
Steven Li 已提交
2630
            return
2631 2632

        dbc.execute("use db")
S
Shuduo Sang 已提交
2633
        sTbName = dbManager.getFixedSuperTableName()
2634 2635

        # get all regular tables
S
Shuduo Sang 已提交
2636 2637
        # TODO: analyze result set later
        dbc.query("select TBNAME from db.{}".format(sTbName))
2638 2639 2640
        rTables = dbc.getQueryResult()

        bList = TaskExecutor.BoundedList()
S
Shuduo Sang 已提交
2641
        for rTbName in rTables:  # regular tables
2642 2643
            dbc.query("select speed from db.{}".format(rTbName[0]))
            numbers = dbc.getQueryResult()
S
Shuduo Sang 已提交
2644
            for row in numbers:
2645 2646 2647 2648 2649 2650
                # print("<{}>".format(n), end="", flush=True)
                bList.add(row[0])

        print("Top numbers in DB right now: {}".format(bList))
        print("TDengine client execution is about to start in 2 seconds...")
        time.sleep(2.0)
S
Shuduo Sang 已提交
2651
        dbManager = None  # release?
2652 2653 2654 2655

    def prepare(self):
        self._printLastNumbers()

2656
    def run(self, svcMgr):    
2657 2658
        self._printLastNumbers()

S
Shuduo Sang 已提交
2659
        dbManager = DbManager()  # Regular function
2660
        thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
2661
        self.tc = ThreadCoordinator(thPool, dbManager)
2662
        
2663
        self.tc.run()
S
Steven Li 已提交
2664 2665
        # print("exec stats: {}".format(self.tc.getExecStats()))
        # print("TC failed = {}".format(self.tc.isFailed()))
2666
        if svcMgr: # gConfig.auto_start_service:
2667
            svcMgr.stopTaosService()
2668 2669
        # Print exec status, etc., AFTER showing messages from the server
        self.conclude()
S
Steven Li 已提交
2670
        # print("TC failed (2) = {}".format(self.tc.isFailed()))
S
Shuduo Sang 已提交
2671 2672
        # Linux return code: ref https://shapeshed.com/unix-exit-codes/
        return 1 if self.tc.isFailed() else 0
2673 2674 2675

    def conclude(self):
        self.tc.printStats()
S
Shuduo Sang 已提交
2676
        self.tc.getDbManager().cleanUp()
2677 2678

class MainExec:
2679 2680 2681
    STATUS_STARTING = 1
    STATUS_RUNNING = 2
    STATUS_STOPPING = 3
2682
    STATUS_STOPPED = 4
2683

2684 2685 2686
    def __init__(self):        
        self._clientMgr = None
        self._svcMgr = None
2687

2688 2689 2690
        signal.signal(signal.SIGTERM, self.sigIntHandler)
        signal.signal(signal.SIGINT,  self.sigIntHandler)
        signal.signal(signal.SIGUSR1, self.sigUsrHandler)  # different handler!
2691

2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704
    def sigUsrHandler(self, signalNumber, frame):
        if self._clientMgr:
            self._clientMgr.sigUsrHandler(signalNumber, frame)
        elif self._svcMgr: # Only if no client mgr, we are running alone
            self._svcMgr.sigUsrHandler(signalNumber, frame)
        
    def sigIntHandler(self, signalNumber, frame):
        if self._svcMgr:
            self._svcMgr.sigIntHandler(signalNumber, frame)
        if self._clientMgr:
            self._clientMgr.sigIntHandler(signalNumber, frame)

    def runClient(self):
2705
        global gSvcMgr
2706 2707
        if gConfig.auto_start_service:
            self._svcMgr = SvcManager()
2708
            gSvcMgr = self._svcMgr # hack alert
2709 2710 2711
            self._svcMgr.startTaosService() # we start, don't run
        
        self._clientMgr = ClientManager()
2712 2713 2714 2715 2716 2717 2718
        ret = None
        try: 
            ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside
        except requests.exceptions.ConnectionError as err:
            logger.warning("Failed to open REST connection to DB")
            # don't raise
        return ret
2719 2720

    def runService(self):
2721
        global gSvcMgr
2722
        self._svcMgr = SvcManager()
2723 2724
        gSvcMgr = self._svcMgr # save it in a global variable TODO: hack alert

2725
        self._svcMgr.run() # run to some end state
2726 2727
        self._svcMgr = None 
        gSvcMgr = None        
2728 2729

    def runTemp(self):  # for debugging purposes
2730 2731
        # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix
        # dbc = dbState.getDbConn()
S
Shuduo Sang 已提交
2732
        # sTbName = dbState.getFixedSuperTableName()
2733 2734
        # dbc.execute("create database if not exists db")
        # if not dbState.getState().equals(StateEmpty()):
S
Shuduo Sang 已提交
2735
        #     dbc.execute("use db")
2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746

        # rTables = None
        # try: # the super table may not exist
        #     sql = "select TBNAME from db.{}".format(sTbName)
        #     logger.info("Finding out tables in super table: {}".format(sql))
        #     dbc.query(sql) # TODO: analyze result set later
        #     logger.info("Fetching result")
        #     rTables = dbc.getQueryResult()
        #     logger.info("Result: {}".format(rTables))
        # except taos.error.ProgrammingError as err:
        #     logger.info("Initial Super table OPS error: {}".format(err))
S
Shuduo Sang 已提交
2747

2748 2749 2750 2751 2752 2753 2754 2755
        # # sys.exit()
        # if ( not rTables == None):
        #     # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
        #     try:
        #         for rTbName in rTables : # regular tables
        #             ds = dbState
        #             logger.info("Inserting into table: {}".format(rTbName[0]))
        #             sql = "insert into db.{} values ('{}', {});".format(
S
Shuduo Sang 已提交
2756
        #                 rTbName[0],
2757 2758
        #                 ds.getNextTick(), ds.getNextInt())
        #             dbc.execute(sql)
S
Shuduo Sang 已提交
2759
        #         for rTbName in rTables : # regular tables
2760
        #             dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure
S
Shuduo Sang 已提交
2761
        #         logger.info("Initial READING operation is successful")
2762
        #     except taos.error.ProgrammingError as err:
S
Shuduo Sang 已提交
2763 2764
        #         logger.info("Initial WRITE/READ error: {}".format(err))

2765 2766 2767
        # Sandbox testing code
        # dbc = dbState.getDbConn()
        # while True:
S
Shuduo Sang 已提交
2768
        #     rows = dbc.query("show databases")
2769
        #     print("Rows: {}, time={}".format(rows, time.time()))
S
Shuduo Sang 已提交
2770 2771
        return

S
Steven Li 已提交
2772

2773
def main():
S
Shuduo Sang 已提交
2774 2775
    # Super cool Python argument library:
    # https://docs.python.org/3/library/argparse.html
2776 2777 2778 2779 2780 2781 2782 2783 2784
    parser = argparse.ArgumentParser(
        formatter_class=argparse.RawDescriptionHelpFormatter,
        description=textwrap.dedent('''\
            TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
            ---------------------------------------------------------------------
            1. You build TDengine in the top level ./build directory, as described in offical docs
            2. You run the server there before this script: ./build/bin/taosd -c test/cfg

            '''))
2785

2786 2787 2788 2789 2790 2791 2792 2793 2794 2795 2796 2797 2798 2799 2800 2801 2802 2803 2804 2805 2806
    # parser.add_argument('-a', '--auto-start-service', action='store_true',                        
    #                     help='Automatically start/stop the TDengine service (default: false)')
    # parser.add_argument('-c', '--connector-type', action='store', default='native', type=str,
    #                     help='Connector type to use: native, rest, or mixed (default: 10)')
    # parser.add_argument('-d', '--debug', action='store_true',                        
    #                     help='Turn on DEBUG mode for more logging (default: false)')
    # parser.add_argument('-e', '--run-tdengine', action='store_true',                        
    #                     help='Run TDengine service in foreground (default: false)')
    # parser.add_argument('-l', '--larger-data', action='store_true',                        
    #                     help='Write larger amount of data during write operations (default: false)')
    # parser.add_argument('-p', '--per-thread-db-connection', action='store_true',                        
    #                     help='Use a single shared db connection (default: false)')
    # parser.add_argument('-r', '--record-ops', action='store_true',                        
    #                     help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')                    
    # parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int,
    #                     help='Maximum number of steps to run (default: 100)')
    # parser.add_argument('-t', '--num-threads', action='store', default=5, type=int,
    #                     help='Number of threads to run (default: 10)')
    # parser.add_argument('-x', '--continue-on-exception', action='store_true',                        
    #                     help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')                        

S
Shuduo Sang 已提交
2807 2808 2809 2810 2811 2812 2813 2814 2815 2816 2817 2818 2819 2820 2821 2822 2823 2824 2825 2826 2827 2828 2829 2830 2831 2832 2833 2834 2835 2836 2837 2838 2839 2840 2841 2842 2843 2844 2845 2846 2847 2848 2849 2850 2851 2852 2853 2854 2855 2856 2857
    parser.add_argument(
        '-a',
        '--auto-start-service',
        action='store_true',
        help='Automatically start/stop the TDengine service (default: false)')
    parser.add_argument(
        '-c',
        '--connector-type',
        action='store',
        default='native',
        type=str,
        help='Connector type to use: native, rest, or mixed (default: 10)')
    parser.add_argument(
        '-d',
        '--debug',
        action='store_true',
        help='Turn on DEBUG mode for more logging (default: false)')
    parser.add_argument(
        '-e',
        '--run-tdengine',
        action='store_true',
        help='Run TDengine service in foreground (default: false)')
    parser.add_argument(
        '-l',
        '--larger-data',
        action='store_true',
        help='Write larger amount of data during write operations (default: false)')
    parser.add_argument(
        '-p',
        '--per-thread-db-connection',
        action='store_true',
        help='Use a single shared db connection (default: false)')
    parser.add_argument(
        '-r',
        '--record-ops',
        action='store_true',
        help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')
    parser.add_argument(
        '-s',
        '--max-steps',
        action='store',
        default=1000,
        type=int,
        help='Maximum number of steps to run (default: 100)')
    parser.add_argument(
        '-t',
        '--num-threads',
        action='store',
        default=5,
        type=int,
        help='Number of threads to run (default: 10)')
2858 2859 2860 2861 2862
    parser.add_argument(
        '-x',
        '--continue-on-exception',
        action='store_true',
        help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')
2863

2864
    global gConfig
2865
    gConfig = parser.parse_args()
2866

2867
    # Logging Stuff
2868
    global logger
S
Shuduo Sang 已提交
2869 2870
    _logger = logging.getLogger('CrashGen')  # real logger
    _logger.addFilter(LoggingFilter())
2871 2872 2873
    ch = logging.StreamHandler()
    _logger.addHandler(ch)

S
Shuduo Sang 已提交
2874 2875
    # Logging adapter, to be used as a logger
    logger = MyLoggingAdapter(_logger, [])
2876

S
Shuduo Sang 已提交
2877 2878
    if (gConfig.debug):
        logger.setLevel(logging.DEBUG)  # default seems to be INFO
S
Steven Li 已提交
2879 2880
    else:
        logger.setLevel(logging.INFO)
S
Shuduo Sang 已提交
2881

2882 2883
    Dice.seed(0)  # initial seeding of dice

2884
    # Run server or client
2885
    mExec = MainExec()
S
Shuduo Sang 已提交
2886
    if gConfig.run_tdengine:  # run server
2887
        mExec.runService()
S
Shuduo Sang 已提交
2888
    else:
2889
        return mExec.runClient()
2890

S
Shuduo Sang 已提交
2891

2892
if __name__ == "__main__":
S
Steven Li 已提交
2893 2894 2895
    exitCode = main()
    # print("Exiting with code: {}".format(exitCode))
    sys.exit(exitCode)