crash_gen.py 101.5 KB
Newer Older
S
Shuduo Sang 已提交
1
# -----!/usr/bin/python3.7
S
Steven Li 已提交
2 3 4 5 6 7 8 9 10 11 12 13
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-
S
Shuduo Sang 已提交
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
# For type hinting before definition, ref:
# https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel
from __future__ import annotations
import taos
import crash_gen
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.log import *
from queue import Queue, Empty
from typing import IO
from typing import Set
from typing import Dict
from typing import List
from requests.auth import HTTPBasicAuth
import textwrap
import datetime
import logging
import time
import random
import threading
import requests
import copy
import argparse
import getopt
39

S
Steven Li 已提交
40
import sys
41
import os
42 43
import io
import signal
44
import traceback
45 46 47 48 49 50 51

try:
    import psutil
except:
    print("Psutil module needed, please install: sudo pip3 install psutil")
    sys.exit(-1)

52 53 54 55
# Require Python 3
if sys.version_info[0] < 3:
    raise Exception("Must be using Python 3")

S
Steven Li 已提交
56

S
Shuduo Sang 已提交
57
# Global variables, tried to keep a small number.
58 59 60

# Command-line/Environment Configurations, will set a bit later
# ConfigNameSpace = argparse.Namespace
S
Shuduo Sang 已提交
61
gConfig = argparse.Namespace()  # Dummy value, will be replaced later
62
logger = None
S
Steven Li 已提交
63

S
Shuduo Sang 已提交
64 65

def runThread(wt: WorkerThread):
66
    wt.run()
67

S
Shuduo Sang 已提交
68

69 70
class CrashGenError(Exception):
    def __init__(self, msg=None, errno=None):
S
Shuduo Sang 已提交
71
        self.msg = msg
72
        self.errno = errno
S
Shuduo Sang 已提交
73

74 75 76
    def __str__(self):
        return self.msg

S
Shuduo Sang 已提交
77

S
Steven Li 已提交
78
class WorkerThread:
79
    def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator,
S
Shuduo Sang 已提交
80 81 82
                 # te: TaskExecutor,
                 ):  # note: main thread context!
        # self._curStep = -1
83
        self._pool = pool
S
Shuduo Sang 已提交
84 85
        self._tid = tid
        self._tc = tc  # type: ThreadCoordinator
S
Steven Li 已提交
86
        # self.threadIdent = threading.get_ident()
87 88
        self._thread = threading.Thread(target=runThread, args=(self,))
        self._stepGate = threading.Event()
S
Steven Li 已提交
89

90
        # Let us have a DB connection of our own
S
Shuduo Sang 已提交
91
        if (gConfig.per_thread_db_connection):  # type: ignore
92
            # print("connector_type = {}".format(gConfig.connector_type))
93 94 95 96 97 98 99 100 101 102 103
            if gConfig.connector_type == 'native':
                self._dbConn = DbConn.createNative() 
            elif gConfig.connector_type == 'rest':
                self._dbConn = DbConn.createRest() 
            elif gConfig.connector_type == 'mixed':
                if Dice.throw(2) == 0: # 1/2 chance
                    self._dbConn = DbConn.createNative() 
                else:
                    self._dbConn = DbConn.createRest() 
            else:
                raise RuntimeError("Unexpected connector type: {}".format(gConfig.connector_type))
104

S
Shuduo Sang 已提交
105
        self._dbInUse = False  # if "use db" was executed already
106

107
    def logDebug(self, msg):
S
Steven Li 已提交
108
        logger.debug("    TRD[{}] {}".format(self._tid, msg))
109 110

    def logInfo(self, msg):
S
Steven Li 已提交
111
        logger.info("    TRD[{}] {}".format(self._tid, msg))
112

113 114 115 116
    def dbInUse(self):
        return self._dbInUse

    def useDb(self):
S
Shuduo Sang 已提交
117
        if (not self._dbInUse):
118 119 120
            self.execSql("use db")
        self._dbInUse = True

121
    def getTaskExecutor(self):
S
Shuduo Sang 已提交
122
        return self._tc.getTaskExecutor()
123

S
Steven Li 已提交
124
    def start(self):
125
        self._thread.start()  # AFTER the thread is recorded
S
Steven Li 已提交
126

S
Shuduo Sang 已提交
127
    def run(self):
S
Steven Li 已提交
128
        # initialization after thread starts, in the thread context
129
        # self.isSleeping = False
130 131
        logger.info("Starting to run thread: {}".format(self._tid))

S
Shuduo Sang 已提交
132
        if (gConfig.per_thread_db_connection):  # type: ignore
133
            logger.debug("Worker thread openning database connection")
134
            self._dbConn.open()
S
Steven Li 已提交
135

S
Shuduo Sang 已提交
136 137
        self._doTaskLoop()

138
        # clean up
S
Shuduo Sang 已提交
139
        if (gConfig.per_thread_db_connection):  # type: ignore
140
            self._dbConn.close()
141

S
Shuduo Sang 已提交
142
    def _doTaskLoop(self):
143 144
        # while self._curStep < self._pool.maxSteps:
        # tc = ThreadCoordinator(None)
S
Shuduo Sang 已提交
145 146
        while True:
            tc = self._tc  # Thread Coordinator, the overall master
147 148 149 150 151 152
            try:
                tc.crossStepBarrier()  # shared barrier first, INCLUDING the last one
            except threading.BrokenBarrierError as err: # main thread timed out
                logger.debug("[TRD] Worker thread exiting due to main thread barrier time-out")
                break

153
            logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid))
154
            self.crossStepGate()   # then per-thread gate, after being tapped
155
            logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
156
            if not self._tc.isRunning():
157
                logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
158 159
                break

160
            # Fetch a task from the Thread Coordinator
161
            logger.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid))
162
            task = tc.fetchTask()
163 164

            # Execute such a task
S
Shuduo Sang 已提交
165 166 167
            logger.debug(
                "[TRD] Worker thread [{}] about to execute task: {}".format(
                    self._tid, task.__class__.__name__))
168
            task.execute(self)
169
            tc.saveExecutedTask(task)
170
            logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
S
Shuduo Sang 已提交
171 172

            self._dbInUse = False  # there may be changes between steps
173

S
Shuduo Sang 已提交
174 175
    def verifyThreadSelf(self):  # ensure we are called by this own thread
        if (threading.get_ident() != self._thread.ident):
S
Steven Li 已提交
176 177
            raise RuntimeError("Unexpectly called from other threads")

S
Shuduo Sang 已提交
178 179
    def verifyThreadMain(self):  # ensure we are called by the main thread
        if (threading.get_ident() != threading.main_thread().ident):
S
Steven Li 已提交
180 181 182
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadAlive(self):
S
Shuduo Sang 已提交
183
        if (not self._thread.is_alive()):
S
Steven Li 已提交
184 185
            raise RuntimeError("Unexpected dead thread")

186
    # A gate is different from a barrier in that a thread needs to be "tapped"
S
Steven Li 已提交
187 188
    def crossStepGate(self):
        self.verifyThreadAlive()
S
Shuduo Sang 已提交
189 190
        self.verifyThreadSelf()  # only allowed by ourselves

191
        # Wait again at the "gate", waiting to be "tapped"
S
Shuduo Sang 已提交
192 193 194 195
        logger.debug(
            "[TRD] Worker thread {} about to cross the step gate".format(
                self._tid))
        self._stepGate.wait()
196
        self._stepGate.clear()
S
Shuduo Sang 已提交
197

198
        # self._curStep += 1  # off to a new step...
S
Steven Li 已提交
199

S
Shuduo Sang 已提交
200
    def tapStepGate(self):  # give it a tap, release the thread waiting there
S
Steven Li 已提交
201
        self.verifyThreadAlive()
S
Shuduo Sang 已提交
202 203
        self.verifyThreadMain()  # only allowed for main thread

S
Steven Li 已提交
204
        logger.debug("[TRD] Tapping worker thread {}".format(self._tid))
S
Shuduo Sang 已提交
205 206
        self._stepGate.set()  # wake up!
        time.sleep(0)  # let the released thread run a bit
207

S
Shuduo Sang 已提交
208 209 210
    def execSql(self, sql):  # TODO: expose DbConn directly
        if (gConfig.per_thread_db_connection):
            return self._dbConn.execute(sql)
211
        else:
212
            return self._tc.getDbManager().getDbConn().execute(sql)
213

S
Shuduo Sang 已提交
214 215 216
    def querySql(self, sql):  # TODO: expose DbConn directly
        if (gConfig.per_thread_db_connection):
            return self._dbConn.query(sql)
217 218 219 220
        else:
            return self._tc.getDbManager().getDbConn().query(sql)

    def getQueryResult(self):
S
Shuduo Sang 已提交
221 222
        if (gConfig.per_thread_db_connection):
            return self._dbConn.getQueryResult()
223 224 225
        else:
            return self._tc.getDbManager().getDbConn().getQueryResult()

226
    def getDbConn(self):
S
Shuduo Sang 已提交
227 228
        if (gConfig.per_thread_db_connection):
            return self._dbConn
229
        else:
230
            return self._tc.getDbManager().getDbConn()
231

232 233
    # def querySql(self, sql): # not "execute", since we are out side the DB context
    #     if ( gConfig.per_thread_db_connection ):
S
Shuduo Sang 已提交
234
    #         return self._dbConn.query(sql)
235 236
    #     else:
    #         return self._tc.getDbState().getDbConn().query(sql)
237

238
# The coordinator of all worker threads, mostly running in main thread
S
Shuduo Sang 已提交
239 240


241
class ThreadCoordinator:
242
    def __init__(self, pool: ThreadPool, dbManager):
S
Shuduo Sang 已提交
243
        self._curStep = -1  # first step is 0
244
        self._pool = pool
245
        # self._wd = wd
S
Shuduo Sang 已提交
246
        self._te = None  # prepare for every new step
247
        self._dbManager = dbManager
S
Shuduo Sang 已提交
248 249
        self._executedTasks: List[Task] = []  # in a given step
        self._lock = threading.RLock()  # sync access for a few things
S
Steven Li 已提交
250

S
Shuduo Sang 已提交
251 252
        self._stepBarrier = threading.Barrier(
            self._pool.numThreads + 1)  # one barrier for all threads
253
        self._execStats = ExecutionStats()
254
        self._runStatus = MainExec.STATUS_RUNNING
S
Steven Li 已提交
255

256 257 258
    def getTaskExecutor(self):
        return self._te

S
Shuduo Sang 已提交
259
    def getDbManager(self) -> DbManager:
260
        return self._dbManager
261

262 263
    def crossStepBarrier(self, timeout=None):
        self._stepBarrier.wait(timeout) 
264

265 266 267 268
    def requestToStop(self):
        self._runStatus = MainExec.STATUS_STOPPING
        self._execStats.registerFailure("User Interruption")

269
    def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout):
270 271 272 273 274 275 276 277 278
        maxSteps = gConfig.max_steps  # type: ignore
        if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9
            return True
        if self._runStatus != MainExec.STATUS_RUNNING:
            return True
        if transitionFailed:
            return True
        if hasAbortedTask:
            return True
279 280
        if workerTimeout:
            return True
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
        return False

    def _hasAbortedTask(self): # from execution of previous step
        for task in self._executedTasks:
            if task.isAborted():
                # print("Task aborted: {}".format(task))
                # hasAbortedTask = True
                return True
        return False

    def _releaseAllWorkerThreads(self, transitionFailed):
        self._curStep += 1  # we are about to get into next step. TODO: race condition here!
        # Now not all threads had time to go to sleep
        logger.debug(
            "--\r\n\n--> Step {} starts with main thread waking up".format(self._curStep))

        # A new TE for the new step
        self._te = None # set to empty first, to signal worker thread to stop
        if not transitionFailed:  # only if not failed
            self._te = TaskExecutor(self._curStep)

        logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(
                self._curStep))  # Now not all threads had time to go to sleep
        # Worker threads will wake up at this point, and each execute it's own task
        self.tapAllThreads() # release all worker thread from their "gate"

    def _syncAtBarrier(self):
         # Now main thread (that's us) is ready to enter a step
        # let other threads go past the pool barrier, but wait at the
        # thread gate
        logger.debug("[TRD] Main thread about to cross the barrier")
312
        self.crossStepBarrier(timeout=15)
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348
        self._stepBarrier.reset()  # Other worker threads should now be at the "gate"
        logger.debug("[TRD] Main thread finished crossing the barrier")

    def _doTransition(self):
        transitionFailed = False
        try:
            sm = self._dbManager.getStateMachine()
            logger.debug("[STT] starting transitions")
            # at end of step, transiton the DB state
            sm.transition(self._executedTasks)
            logger.debug("[STT] transition ended")
            # Due to limitation (or maybe not) of the Python library,
            # we cannot share connections across threads
            if sm.hasDatabase():
                for t in self._pool.threadList:
                    logger.debug("[DB] use db for all worker threads")
                    t.useDb()
                    # t.execSql("use db") # main thread executing "use
                    # db" on behalf of every worker thread
        except taos.error.ProgrammingError as err:
            if (err.msg == 'network unavailable'):  # broken DB connection
                logger.info("DB connection broken, execution failed")
                traceback.print_stack()
                transitionFailed = True
                self._te = None  # Not running any more
                self._execStats.registerFailure("Broken DB Connection")
                # continue # don't do that, need to tap all threads at
                # end, and maybe signal them to stop
            else:
                raise

        self.resetExecutedTasks()  # clear the tasks after we are done
        # Get ready for next step
        logger.debug("<-- Step {} finished, trasition failed = {}".format(self._curStep, transitionFailed))
        return transitionFailed

S
Shuduo Sang 已提交
349
    def run(self):
350
        self._pool.createAndStartThreads(self)
S
Steven Li 已提交
351 352

        # Coordinate all threads step by step
S
Shuduo Sang 已提交
353
        self._curStep = -1  # not started yet
354
        
S
Shuduo Sang 已提交
355
        self._execStats.startExec()  # start the stop watch
356 357
        transitionFailed = False
        hasAbortedTask = False
358 359
        workerTimeout = False
        while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout):
360
            if not gConfig.debug: # print this only if we are not in debug mode                
S
Shuduo Sang 已提交
361
                print(".", end="", flush=True)
362
                        
363 364 365 366 367 368 369 370 371 372
            try:
                self._syncAtBarrier() # For now just cross the barrier
            except threading.BrokenBarrierError as err:
                logger.info("Main loop aborted, caused by worker thread time-out")
                self._execStats.registerFailure("Aborted due to worker thread timeout")
                print("\n\nWorker Thread time-out detected, important thread info:")
                ts = ThreadStacks()
                ts.print(filterInternal=True)
                workerTimeout = True
                break
373 374

            # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
S
Shuduo Sang 已提交
375 376
            # We use this period to do house keeping work, when all worker
            # threads are QUIET.
377 378 379
            hasAbortedTask = self._hasAbortedTask() # from previous step
            if hasAbortedTask: 
                logger.info("Aborted task encountered, exiting test program")
380
                self._execStats.registerFailure("Aborted Task Encountered")
381
                break # do transition only if tasks are error free
S
Shuduo Sang 已提交
382

383 384 385 386
            # Ending previous step
            transitionFailed = self._doTransition() # To start, we end step -1 first
            # Then we move on to the next step
            self._releaseAllWorkerThreads(transitionFailed)                    
387

388 389
        if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate"
            logger.debug("Abnormal ending of main thraed")
390 391
        elif workerTimeout:
            logger.debug("Abnormal ending of main thread, due to worker timeout")
392 393 394
        else: # regular ending, workers waiting at "barrier"
            logger.debug("Regular ending, main thread waiting for all worker threads to stop...")
            self._syncAtBarrier()
395

396 397 398
        self._te = None  # No more executor, time to end
        logger.debug("Main thread tapping all threads one last time...")
        self.tapAllThreads()  # Let the threads run one last time
399

400
        logger.debug("\r\n\n--> Main thread ready to finish up...")
401
        logger.debug("Main thread joining all threads")
S
Shuduo Sang 已提交
402
        self._pool.joinAll()  # Get all threads to finish
403
        logger.info("\nAll worker threads finished")
404 405
        self._execStats.endExec()

406 407
    def printStats(self):
        self._execStats.printStats()
S
Steven Li 已提交
408

S
Steven Li 已提交
409 410 411 412 413 414
    def isFailed(self):
        return self._execStats.isFailed()

    def getExecStats(self):
        return self._execStats

S
Shuduo Sang 已提交
415
    def tapAllThreads(self):  # in a deterministic manner
S
Steven Li 已提交
416
        wakeSeq = []
S
Shuduo Sang 已提交
417 418
        for i in range(self._pool.numThreads):  # generate a random sequence
            if Dice.throw(2) == 1:
S
Steven Li 已提交
419 420 421
                wakeSeq.append(i)
            else:
                wakeSeq.insert(0, i)
S
Shuduo Sang 已提交
422 423 424
        logger.debug(
            "[TRD] Main thread waking up worker threads: {}".format(
                str(wakeSeq)))
425
        # TODO: set dice seed to a deterministic value
S
Steven Li 已提交
426
        for i in wakeSeq:
S
Shuduo Sang 已提交
427 428 429
            # TODO: maybe a bit too deep?!
            self._pool.threadList[i].tapStepGate()
            time.sleep(0)  # yield
S
Steven Li 已提交
430

431
    def isRunning(self):
S
Shuduo Sang 已提交
432
        return self._te is not None
433

S
Shuduo Sang 已提交
434 435
    def fetchTask(self) -> Task:
        if (not self.isRunning()):  # no task
436
            raise RuntimeError("Cannot fetch task when not running")
437 438
        # return self._wd.pickTask()
        # Alternatively, let's ask the DbState for the appropriate task
439 440 441 442 443 444 445
        # dbState = self.getDbState()
        # tasks = dbState.getTasksAtState() # TODO: create every time?
        # nTasks = len(tasks)
        # i = Dice.throw(nTasks)
        # logger.debug(" (dice:{}/{}) ".format(i, nTasks))
        # # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc.
        # return tasks[i].clone() # TODO: still necessary?
S
Shuduo Sang 已提交
446 447 448 449 450
        # pick a task type for current state
        taskType = self.getDbManager().getStateMachine().pickTaskType()
        return taskType(
            self.getDbManager(),
            self._execStats)  # create a task from it
451 452

    def resetExecutedTasks(self):
S
Shuduo Sang 已提交
453
        self._executedTasks = []  # should be under single thread
454 455 456 457

    def saveExecutedTask(self, task):
        with self._lock:
            self._executedTasks.append(task)
458 459

# We define a class to run a number of threads in locking steps.
S
Shuduo Sang 已提交
460 461


462
class ThreadPool:
463
    def __init__(self, numThreads, maxSteps):
464 465 466 467
        self.numThreads = numThreads
        self.maxSteps = maxSteps
        # Internal class variables
        self.curStep = 0
S
Shuduo Sang 已提交
468 469
        self.threadList = []  # type: List[WorkerThread]

470
    # starting to run all the threads, in locking steps
471
    def createAndStartThreads(self, tc: ThreadCoordinator):
S
Shuduo Sang 已提交
472 473
        for tid in range(0, self.numThreads):  # Create the threads
            workerThread = WorkerThread(self, tid, tc)
474
            self.threadList.append(workerThread)
S
Shuduo Sang 已提交
475
            workerThread.start()  # start, but should block immediately before step 0
476 477 478 479 480 481

    def joinAll(self):
        for workerThread in self.threadList:
            logger.debug("Joining thread...")
            workerThread._thread.join()

482 483
# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers
# for new table names
S
Shuduo Sang 已提交
484 485


S
Steven Li 已提交
486 487
class LinearQueue():
    def __init__(self):
488
        self.firstIndex = 1  # 1st ever element
S
Steven Li 已提交
489
        self.lastIndex = 0
S
Shuduo Sang 已提交
490 491
        self._lock = threading.RLock()  # our functions may call each other
        self.inUse = set()  # the indexes that are in use right now
S
Steven Li 已提交
492

493
    def toText(self):
S
Shuduo Sang 已提交
494 495
        return "[{}..{}], in use: {}".format(
            self.firstIndex, self.lastIndex, self.inUse)
496 497

    # Push (add new element, largest) to the tail, and mark it in use
S
Shuduo Sang 已提交
498
    def push(self):
499
        with self._lock:
S
Shuduo Sang 已提交
500 501
            # if ( self.isEmpty() ):
            #     self.lastIndex = self.firstIndex
502
            #     return self.firstIndex
503 504
            # Otherwise we have something
            self.lastIndex += 1
505 506
            self.allocate(self.lastIndex)
            # self.inUse.add(self.lastIndex) # mark it in use immediately
507
            return self.lastIndex
S
Steven Li 已提交
508 509

    def pop(self):
510
        with self._lock:
S
Shuduo Sang 已提交
511 512 513 514
            if (self.isEmpty()):
                # raise RuntimeError("Cannot pop an empty queue")
                return False  # TODO: None?

515
            index = self.firstIndex
S
Shuduo Sang 已提交
516
            if (index in self.inUse):
517 518
                return False

519 520 521 522 523 524 525
            self.firstIndex += 1
            return index

    def isEmpty(self):
        return self.firstIndex > self.lastIndex

    def popIfNotEmpty(self):
526
        with self._lock:
527 528 529 530
            if (self.isEmpty()):
                return 0
            return self.pop()

S
Steven Li 已提交
531
    def allocate(self, i):
532
        with self._lock:
533
            # logger.debug("LQ allocating item {}".format(i))
S
Shuduo Sang 已提交
534 535 536
            if (i in self.inUse):
                raise RuntimeError(
                    "Cannot re-use same index in queue: {}".format(i))
537 538
            self.inUse.add(i)

S
Steven Li 已提交
539
    def release(self, i):
540
        with self._lock:
541
            # logger.debug("LQ releasing item {}".format(i))
S
Shuduo Sang 已提交
542
            self.inUse.remove(i)  # KeyError possible, TODO: why?
543 544 545 546

    def size(self):
        return self.lastIndex + 1 - self.firstIndex

S
Steven Li 已提交
547
    def pickAndAllocate(self):
S
Shuduo Sang 已提交
548
        if (self.isEmpty()):
549 550
            return None
        with self._lock:
S
Shuduo Sang 已提交
551
            cnt = 0  # counting the interations
552 553
            while True:
                cnt += 1
S
Shuduo Sang 已提交
554
                if (cnt > self.size() * 10):  # 10x iteration already
555 556
                    # raise RuntimeError("Failed to allocate LinearQueue element")
                    return None
S
Shuduo Sang 已提交
557 558
                ret = Dice.throwRange(self.firstIndex, self.lastIndex + 1)
                if (ret not in self.inUse):
559 560 561
                    self.allocate(ret)
                    return ret

S
Shuduo Sang 已提交
562

563
class DbConn:
564
    TYPE_NATIVE = "native-c"
565
    TYPE_REST =   "rest-api"
566 567 568 569 570 571 572 573 574
    TYPE_INVALID = "invalid"

    @classmethod
    def create(cls, connType):
        if connType == cls.TYPE_NATIVE:
            return DbConnNative()
        elif connType == cls.TYPE_REST:
            return DbConnRest()
        else:
S
Shuduo Sang 已提交
575 576
            raise RuntimeError(
                "Unexpected connection type: {}".format(connType))
577 578 579 580 581 582 583 584 585

    @classmethod
    def createNative(cls):
        return cls.create(cls.TYPE_NATIVE)

    @classmethod
    def createRest(cls):
        return cls.create(cls.TYPE_REST)

586 587
    def __init__(self):
        self.isOpen = False
588 589 590
        self._type = self.TYPE_INVALID

    def open(self):
S
Shuduo Sang 已提交
591
        if (self.isOpen):
592 593
            raise RuntimeError("Cannot re-open an existing DB connection")

594 595
        # below implemented by child classes
        self.openByType()
596

597
        logger.debug("[DB] data connection opened, type = {}".format(self._type))
598 599
        self.isOpen = True

S
Shuduo Sang 已提交
600 601 602 603
    def resetDb(self):  # reset the whole database, etc.
        if (not self.isOpen):
            raise RuntimeError(
                "Cannot reset database until connection is open")
604 605
        # self._tdSql.prepare() # Recreate database, etc.

606
        self.execute('drop database if exists db')
607 608
        logger.debug("Resetting DB, dropped database")
        # self._cursor.execute('create database db')
609
        # self._cursor.execute('use db')
610 611
        # tdSql.execute('show databases')

S
Shuduo Sang 已提交
612
    def queryScalar(self, sql) -> int:
613 614
        return self._queryAny(sql)

S
Shuduo Sang 已提交
615
    def queryString(self, sql) -> str:
616 617
        return self._queryAny(sql)

S
Shuduo Sang 已提交
618 619
    def _queryAny(self, sql):  # actual query result as an int
        if (not self.isOpen):
620
            raise RuntimeError("Cannot query database until connection is open")
621
        nRows = self.query(sql)
S
Shuduo Sang 已提交
622
        if nRows != 1:
623
            raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows))
624
        if self.getResultRows() != 1 or self.getResultCols() != 1:
625
            raise RuntimeError("Unexpected result set for query: {}".format(sql))
626 627
        return self.getQueryResult()[0][0]

628 629 630 631 632 633 634 635 636
    def use(self, dbName):
        self.execute("use {}".format(dbName))

    def hasDatabases(self):
        return self.query("show databases") > 0

    def hasTables(self):
        return self.query("show tables") > 0

637 638
    def execute(self, sql):
        raise RuntimeError("Unexpected execution, should be overriden")
S
Shuduo Sang 已提交
639

640 641 642
    def query(self, sql) -> int: # return num rows returned
        raise RuntimeError("Unexpected execution, should be overriden")

643 644
    def openByType(self):
        raise RuntimeError("Unexpected execution, should be overriden")
S
Shuduo Sang 已提交
645

646 647
    def getQueryResult(self):
        raise RuntimeError("Unexpected execution, should be overriden")
S
Shuduo Sang 已提交
648

649 650
    def getResultRows(self):
        raise RuntimeError("Unexpected execution, should be overriden")
S
Shuduo Sang 已提交
651

652 653 654 655
    def getResultCols(self):
        raise RuntimeError("Unexpected execution, should be overriden")

# Sample: curl -u root:taosdata -d "show databases" localhost:6020/rest/sql
S
Shuduo Sang 已提交
656 657


658 659 660 661
class DbConnRest(DbConn):
    def __init__(self):
        super().__init__()
        self._type = self.TYPE_REST
S
Shuduo Sang 已提交
662
        self._url = "http://localhost:6020/rest/sql"  # fixed for now
663 664
        self._result = None

S
Shuduo Sang 已提交
665 666 667
    def openByType(self):  # Open connection
        pass  # do nothing, always open

668
    def close(self):
S
Shuduo Sang 已提交
669 670 671
        if (not self.isOpen):
            raise RuntimeError(
                "Cannot clean up database until connection is open")
672 673 674 675 676
        # Do nothing for REST
        logger.debug("[DB] REST Database connection closed")
        self.isOpen = False

    def _doSql(self, sql):
677 678 679 680 681 682 683
        try:
            r = requests.post(self._url, 
                data = sql,
                auth = HTTPBasicAuth('root', 'taosdata'))            
        except:
            print("REST API Failure (TODO: more info here)")
            raise
684 685
        rj = r.json()
        # Sanity check for the "Json Result"
S
Shuduo Sang 已提交
686
        if ('status' not in rj):
687 688
            raise RuntimeError("No status in REST response")

S
Shuduo Sang 已提交
689 690 691 692
        if rj['status'] == 'error':  # clearly reported error
            if ('code' not in rj):  # error without code
                raise RuntimeError("REST error return without code")
            errno = rj['code']  # May need to massage this in the future
693
            # print("Raising programming error with REST return: {}".format(rj))
S
Shuduo Sang 已提交
694 695
            raise taos.error.ProgrammingError(
                rj['desc'], errno)  # todo: check existance of 'desc'
696

S
Shuduo Sang 已提交
697 698 699 700
        if rj['status'] != 'succ':  # better be this
            raise RuntimeError(
                "Unexpected REST return status: {}".format(
                    rj['status']))
701 702

        nRows = rj['rows'] if ('rows' in rj) else 0
S
Shuduo Sang 已提交
703
        self._result = rj
704 705
        return nRows

S
Shuduo Sang 已提交
706 707 708 709
    def execute(self, sql):
        if (not self.isOpen):
            raise RuntimeError(
                "Cannot execute database commands until connection is open")
710 711
        logger.debug("[SQL-REST] Executing SQL: {}".format(sql))
        nRows = self._doSql(sql)
S
Shuduo Sang 已提交
712 713
        logger.debug(
            "[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
714 715
        return nRows

S
Shuduo Sang 已提交
716
    def query(self, sql):  # return rows affected
717 718 719 720 721 722 723 724 725 726 727 728 729
        return self.execute(sql)

    def getQueryResult(self):
        return self._result['data']

    def getResultRows(self):
        print(self._result)
        raise RuntimeError("TBD")
        # return self._tdSql.queryRows

    def getResultCols(self):
        print(self._result)
        raise RuntimeError("TBD")
S
Shuduo Sang 已提交
730

731
    # Duplicate code from TDMySQL, TODO: merge all this into DbConnNative
732 733


734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761
class MyTDSql:
    def __init__(self):
        self.queryRows = 0
        self.queryCols = 0
        self.affectedRows = 0

    def init(self, cursor, log=True):
        self.cursor = cursor
        # if (log):
        #     caller = inspect.getframeinfo(inspect.stack()[1][0])
        #     self.cursor.log(caller.filename + ".sql")

    def close(self):
        self.cursor.close()

    def query(self, sql):
        self.sql = sql
        try:
            self.cursor.execute(sql)
            self.queryResult = self.cursor.fetchall()
            self.queryRows = len(self.queryResult)
            self.queryCols = len(self.cursor.description)
        except Exception as e:
            # caller = inspect.getframeinfo(inspect.stack()[1][0])
            # args = (caller.filename, caller.lineno, sql, repr(e))
            # tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
            raise
        return self.queryRows
762

763 764 765 766 767 768 769 770 771 772 773
    def execute(self, sql):
        self.sql = sql
        try:
            self.affectedRows = self.cursor.execute(sql)
        except Exception as e:
            # caller = inspect.getframeinfo(inspect.stack()[1][0])
            # args = (caller.filename, caller.lineno, sql, repr(e))
            # tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
            raise
        return self.affectedRows

S
Shuduo Sang 已提交
774

775
class DbConnNative(DbConn):
776 777 778 779
    # Class variables
    _lock = threading.Lock()
    _connInfoDisplayed = False

780 781
    def __init__(self):
        super().__init__()
782
        self._type = self.TYPE_NATIVE
S
Shuduo Sang 已提交
783
        self._conn = None
784
        self._cursor = None
785
        
S
Shuduo Sang 已提交
786

787 788 789 790 791 792 793 794 795 796 797
    def getBuildPath(self):
        selfPath = os.path.dirname(os.path.realpath(__file__))
        if ("community" in selfPath):
            projPath = selfPath[:selfPath.find("communit")]
        else:
            projPath = selfPath[:selfPath.find("tests")]

        for root, dirs, files in os.walk(projPath):
            if ("taosd" in files):
                rootRealPath = os.path.dirname(os.path.realpath(root))
                if ("packaging" not in rootRealPath):
S
Shuduo Sang 已提交
798
                    buildPath = root[:len(root) - len("/build/bin")]
799 800 801
                    break
        return buildPath

802
    
S
Shuduo Sang 已提交
803
    def openByType(self):  # Open connection
804
        cfgPath = self.getBuildPath() + "/test/cfg"
805
        hostAddr = "127.0.0.1"
806

807 808 809 810 811
        with self._lock: # force single threading for opening DB connections
            if not self._connInfoDisplayed:
                self.__class__._connInfoDisplayed = True # updating CLASS variable
                logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath))
            
812
            self._conn = taos.connect(host=hostAddr, config=cfgPath)  # TODO: make configurable
813 814
            self._cursor = self._conn.cursor()
        
815
        self._cursor.execute('reset query cache')
S
Shuduo Sang 已提交
816
        # self._cursor.execute('use db') # do this at the beginning of every
817 818

        # Open connection
819
        self._tdSql = MyTDSql()
820
        self._tdSql.init(self._cursor)
S
Shuduo Sang 已提交
821

822
    def close(self):
S
Shuduo Sang 已提交
823 824 825
        if (not self.isOpen):
            raise RuntimeError(
                "Cannot clean up database until connection is open")
826
        self._tdSql.close()
827
        logger.debug("[DB] Database connection closed")
828
        self.isOpen = False
S
Steven Li 已提交
829

S
Shuduo Sang 已提交
830 831 832 833
    def execute(self, sql):
        if (not self.isOpen):
            raise RuntimeError(
                "Cannot execute database commands until connection is open")
834 835
        logger.debug("[SQL] Executing SQL: {}".format(sql))
        nRows = self._tdSql.execute(sql)
S
Shuduo Sang 已提交
836 837 838
        logger.debug(
            "[SQL] Execution Result, nRows = {}, SQL = {}".format(
                nRows, sql))
839
        return nRows
S
Steven Li 已提交
840

S
Shuduo Sang 已提交
841 842 843 844
    def query(self, sql):  # return rows affected
        if (not self.isOpen):
            raise RuntimeError(
                "Cannot query database until connection is open")
845 846
        logger.debug("[SQL] Executing SQL: {}".format(sql))
        nRows = self._tdSql.query(sql)
S
Shuduo Sang 已提交
847 848 849
        logger.debug(
            "[SQL] Query Result, nRows = {}, SQL = {}".format(
                nRows, sql))
850
        return nRows
851
        # results are in: return self._tdSql.queryResult
852

853 854 855
    def getQueryResult(self):
        return self._tdSql.queryResult

856 857
    def getResultRows(self):
        return self._tdSql.queryRows
858

859 860
    def getResultCols(self):
        return self._tdSql.queryCols
861

S
Shuduo Sang 已提交
862

863
class AnyState:
S
Shuduo Sang 已提交
864 865 866
    STATE_INVALID = -1
    STATE_EMPTY = 0  # nothing there, no even a DB
    STATE_DB_ONLY = 1  # we have a DB, but nothing else
867
    STATE_TABLE_ONLY = 2  # we have a table, but totally empty
S
Shuduo Sang 已提交
868
    STATE_HAS_DATA = 3  # we have some data in the table
869 870 871 872 873
    _stateNames = ["Invalid", "Empty", "DB_Only", "Table_Only", "Has_Data"]

    STATE_VAL_IDX = 0
    CAN_CREATE_DB = 1
    CAN_DROP_DB = 2
874 875
    CAN_CREATE_FIXED_SUPER_TABLE = 3
    CAN_DROP_FIXED_SUPER_TABLE = 4
876 877 878 879 880 881 882
    CAN_ADD_DATA = 5
    CAN_READ_DATA = 6

    def __init__(self):
        self._info = self.getInfo()

    def __str__(self):
S
Shuduo Sang 已提交
883 884
        # -1 hack to accomodate the STATE_INVALID case
        return self._stateNames[self._info[self.STATE_VAL_IDX] + 1]
885 886 887 888

    def getInfo(self):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
889 890 891 892 893 894
    def equals(self, other):
        if isinstance(other, int):
            return self.getValIndex() == other
        elif isinstance(other, AnyState):
            return self.getValIndex() == other.getValIndex()
        else:
S
Shuduo Sang 已提交
895 896 897
            raise RuntimeError(
                "Unexpected comparison, type = {}".format(
                    type(other)))
S
Steven Li 已提交
898

899 900 901
    def verifyTasksToState(self, tasks, newState):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
902 903 904
    def getValIndex(self):
        return self._info[self.STATE_VAL_IDX]

905 906
    def getValue(self):
        return self._info[self.STATE_VAL_IDX]
S
Shuduo Sang 已提交
907

908 909
    def canCreateDb(self):
        return self._info[self.CAN_CREATE_DB]
S
Shuduo Sang 已提交
910

911 912
    def canDropDb(self):
        return self._info[self.CAN_DROP_DB]
S
Shuduo Sang 已提交
913

914 915
    def canCreateFixedSuperTable(self):
        return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE]
S
Shuduo Sang 已提交
916

917 918
    def canDropFixedSuperTable(self):
        return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
S
Shuduo Sang 已提交
919

920 921
    def canAddData(self):
        return self._info[self.CAN_ADD_DATA]
S
Shuduo Sang 已提交
922

923 924 925 926 927
    def canReadData(self):
        return self._info[self.CAN_READ_DATA]

    def assertAtMostOneSuccess(self, tasks, cls):
        sCnt = 0
S
Shuduo Sang 已提交
928
        for task in tasks:
929 930 931
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
S
Steven Li 已提交
932
                # task.logDebug("Task success found")
933
                sCnt += 1
S
Shuduo Sang 已提交
934 935 936
                if (sCnt >= 2):
                    raise RuntimeError(
                        "Unexpected more than 1 success with task: {}".format(cls))
937 938 939 940

    def assertIfExistThenSuccess(self, tasks, cls):
        sCnt = 0
        exists = False
S
Shuduo Sang 已提交
941
        for task in tasks:
942 943
            if not isinstance(task, cls):
                continue
S
Shuduo Sang 已提交
944
            exists = True  # we have a valid instance
945 946
            if task.isSuccess():
                sCnt += 1
S
Shuduo Sang 已提交
947 948 949
        if (exists and sCnt <= 0):
            raise RuntimeError(
                "Unexpected zero success for task: {}".format(cls))
950 951

    def assertNoTask(self, tasks, cls):
S
Shuduo Sang 已提交
952
        for task in tasks:
953
            if isinstance(task, cls):
S
Shuduo Sang 已提交
954 955
                raise CrashGenError(
                    "This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__))
956 957

    def assertNoSuccess(self, tasks, cls):
S
Shuduo Sang 已提交
958
        for task in tasks:
959 960
            if isinstance(task, cls):
                if task.isSuccess():
S
Shuduo Sang 已提交
961 962
                    raise RuntimeError(
                        "Unexpected successful task: {}".format(cls))
963 964

    def hasSuccess(self, tasks, cls):
S
Shuduo Sang 已提交
965
        for task in tasks:
966 967 968 969 970 971
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
                return True
        return False

S
Steven Li 已提交
972
    def hasTask(self, tasks, cls):
S
Shuduo Sang 已提交
973
        for task in tasks:
S
Steven Li 已提交
974 975 976 977
            if isinstance(task, cls):
                return True
        return False

S
Shuduo Sang 已提交
978

979 980 981 982
class StateInvalid(AnyState):
    def getInfo(self):
        return [
            self.STATE_INVALID,
S
Shuduo Sang 已提交
983 984 985
            False, False,  # can create/drop Db
            False, False,  # can create/drop fixed table
            False, False,  # can insert/read data with fixed table
986 987 988 989
        ]

    # def verifyTasksToState(self, tasks, newState):

S
Shuduo Sang 已提交
990

991 992 993 994
class StateEmpty(AnyState):
    def getInfo(self):
        return [
            self.STATE_EMPTY,
S
Shuduo Sang 已提交
995 996 997
            True, False,  # can create/drop Db
            False, False,  # can create/drop fixed table
            False, False,  # can insert/read data with fixed table
998 999
        ]

S
Shuduo Sang 已提交
1000 1001
    def verifyTasksToState(self, tasks, newState):
        if (self.hasSuccess(tasks, TaskCreateDb)
1002
                ):  # at EMPTY, if there's succes in creating DB
S
Shuduo Sang 已提交
1003 1004 1005 1006
            if (not self.hasTask(tasks, TaskDropDb)):  # and no drop_db tasks
                # we must have at most one. TODO: compare numbers
                self.assertAtMostOneSuccess(tasks, TaskCreateDb)

1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017

class StateDbOnly(AnyState):
    def getInfo(self):
        return [
            self.STATE_DB_ONLY,
            False, True,
            True, False,
            False, False,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
1018 1019 1020
        if (not self.hasTask(tasks, TaskCreateDb)):
            # only if we don't create any more
            self.assertAtMostOneSuccess(tasks, TaskDropDb)
1021
        self.assertIfExistThenSuccess(tasks, TaskDropDb)
S
Steven Li 已提交
1022
        # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not true in massively parrallel cases
1023
        # Nothing to be said about adding data task
1024
        # if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB
S
Shuduo Sang 已提交
1025 1026 1027
        # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess
        # self.assertAtMostOneSuccess(tasks, DropDbTask)
        # self._state = self.STATE_EMPTY
1028 1029
        # if ( self.hasSuccess(tasks, TaskCreateSuperTable) ): # did not drop db, create table success
        #     # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table
S
Shuduo Sang 已提交
1030
        #     if ( not self.hasTask(tasks, TaskDropSuperTable) ):
1031
        #         self.assertAtMostOneSuccess(tasks, TaskCreateSuperTable) # at most 1 attempt is successful, if we don't drop anything
S
Shuduo Sang 已提交
1032 1033 1034 1035 1036 1037
        # self.assertNoTask(tasks, DropDbTask) # should have have tried
        # if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet
        #     # can't say there's add-data attempts, since they may all fail
        #     self._state = self.STATE_TABLE_ONLY
        # else:
        #     self._state = self.STATE_HAS_DATA
1038 1039 1040 1041
        # What about AddFixedData?
        # elif ( self.hasSuccess(tasks, AddFixedDataTask) ):
        #     self._state = self.STATE_HAS_DATA
        # else: # no success in dropping db tasks, no success in create fixed table? read data should also fail
S
Shuduo Sang 已提交
1042
        #     # raise RuntimeError("Unexpected no-success scenario")   # We might just landed all failure tasks,
1043 1044
        #     self._state = self.STATE_DB_ONLY  # no change

S
Shuduo Sang 已提交
1045

1046
class StateSuperTableOnly(AnyState):
1047 1048 1049 1050 1051 1052 1053 1054 1055
    def getInfo(self):
        return [
            self.STATE_TABLE_ONLY,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
1056
        if (self.hasSuccess(tasks, TaskDropSuperTable)
1057
                ):  # we are able to drop the table
1058
            #self.assertAtMostOneSuccess(tasks, TaskDropSuperTable)
S
Shuduo Sang 已提交
1059 1060
            # we must have had recreted it
            self.hasSuccess(tasks, TaskCreateSuperTable)
1061

1062
            # self._state = self.STATE_DB_ONLY
S
Steven Li 已提交
1063 1064
        # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
        #     self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases
1065
            # self._state = self.STATE_HAS_DATA
S
Steven Li 已提交
1066 1067 1068
        # elif ( self.hasSuccess(tasks, ReadFixedDataTask) ): # no success in prev cases, but was able to read data
            # self.assertNoTask(tasks, DropFixedTableTask)
            # self.assertNoTask(tasks, AddFixedDataTask)
1069
            # self._state = self.STATE_TABLE_ONLY # no change
S
Steven Li 已提交
1070 1071 1072
        # else: # did not drop table, did not insert data, did not read successfully, that is impossible
        #     raise RuntimeError("Unexpected no-success scenarios")
        # TODO: need to revamp!!
1073

S
Shuduo Sang 已提交
1074

1075 1076 1077 1078 1079 1080 1081 1082 1083 1084
class StateHasData(AnyState):
    def getInfo(self):
        return [
            self.STATE_HAS_DATA,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
1085
        if (newState.equals(AnyState.STATE_EMPTY)):
1086
            self.hasSuccess(tasks, TaskDropDb)
S
Shuduo Sang 已提交
1087 1088 1089 1090
            if (not self.hasTask(tasks, TaskCreateDb)):
                self.assertAtMostOneSuccess(tasks, TaskDropDb)  # TODO: dicy
        elif (newState.equals(AnyState.STATE_DB_ONLY)):  # in DB only
            if (not self.hasTask(tasks, TaskCreateDb)
1091
                ):  # without a create_db task
S
Shuduo Sang 已提交
1092 1093
                # we must have drop_db task
                self.assertNoTask(tasks, TaskDropDb)
1094
            self.hasSuccess(tasks, TaskDropSuperTable)
1095
            # self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy
1096 1097 1098 1099
        # elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted
            # self.assertNoTask(tasks, TaskDropDb)
            # self.assertNoTask(tasks, TaskDropSuperTable)
            # self.assertNoTask(tasks, TaskAddData)
S
Steven Li 已提交
1100
            # self.hasSuccess(tasks, DeleteDataTasks)
S
Shuduo Sang 已提交
1101 1102
        else:  # should be STATE_HAS_DATA
            if (not self.hasTask(tasks, TaskCreateDb)
1103
                ):  # only if we didn't create one
S
Shuduo Sang 已提交
1104 1105 1106
                # we shouldn't have dropped it
                self.assertNoTask(tasks, TaskDropDb)
            if (not self.hasTask(tasks, TaskCreateSuperTable)
1107
                    ):  # if we didn't create the table
S
Shuduo Sang 已提交
1108 1109
                # we should not have a task that drops it
                self.assertNoTask(tasks, TaskDropSuperTable)
1110
            # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask)
S
Steven Li 已提交
1111

S
Shuduo Sang 已提交
1112

1113
class StateMechine:
1114 1115
    def __init__(self, dbConn):
        self._dbConn = dbConn
S
Shuduo Sang 已提交
1116 1117 1118 1119 1120
        self._curState = self._findCurrentState()  # starting state
        # transitition target probabilities, indexed with value of STATE_EMPTY,
        # STATE_DB_ONLY, etc.
        self._stateWeights = [1, 3, 5, 15]

1121 1122 1123
    def getCurrentState(self):
        return self._curState

1124 1125 1126
    def hasDatabase(self):
        return self._curState.canDropDb()  # ha, can drop DB means it has one

1127
    # May be slow, use cautionsly...
S
Shuduo Sang 已提交
1128
    def getTaskTypes(self):  # those that can run (directly/indirectly) from the current state
1129 1130 1131 1132 1133 1134
        def typesToStrings(types):
            ss = []
            for t in types:
                ss.append(t.__name__)
            return ss

S
Shuduo Sang 已提交
1135
        allTaskClasses = StateTransitionTask.__subclasses__()  # all state transition tasks
1136 1137
        firstTaskTypes = []
        for tc in allTaskClasses:
S
Shuduo Sang 已提交
1138
            # t = tc(self) # create task object
1139 1140
            if tc.canBeginFrom(self._curState):
                firstTaskTypes.append(tc)
S
Shuduo Sang 已提交
1141 1142 1143 1144 1145 1146 1147 1148
        # now we have all the tasks that can begin directly from the current
        # state, let's figure out the INDIRECT ones
        taskTypes = firstTaskTypes.copy()  # have to have these
        for task1 in firstTaskTypes:  # each task type gathered so far
            endState = task1.getEndState()  # figure the end state
            if endState is None:  # does not change end state
                continue  # no use, do nothing
            for tc in allTaskClasses:  # what task can further begin from there?
1149
                if tc.canBeginFrom(endState) and (tc not in firstTaskTypes):
S
Shuduo Sang 已提交
1150
                    taskTypes.append(tc)  # gather it
1151 1152

        if len(taskTypes) <= 0:
S
Shuduo Sang 已提交
1153 1154 1155 1156 1157 1158 1159
            raise RuntimeError(
                "No suitable task types found for state: {}".format(
                    self._curState))
        logger.debug(
            "[OPS] Tasks found for state {}: {}".format(
                self._curState,
                typesToStrings(taskTypes)))
1160 1161 1162 1163
        return taskTypes

    def _findCurrentState(self):
        dbc = self._dbConn
S
Shuduo Sang 已提交
1164
        ts = time.time()  # we use this to debug how fast/slow it is to do the various queries to find the current DB state
1165 1166
        if not dbc.hasDatabases():  # no database?!
            logger.debug( "[STT] empty database found, between {} and {}".format(ts, time.time()))
1167
            return StateEmpty()
S
Shuduo Sang 已提交
1168 1169
        # did not do this when openning connection, and this is NOT the worker
        # thread, which does this on their own
1170 1171 1172
        dbc.use("db")
        if not dbc.hasTables():  # no tables
            logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
1173
            return StateDbOnly()
1174 1175 1176 1177

        sTable = DbManager.getFixedSuperTable()
        if sTable.hasRegTables(dbc):  # no regular tables
            logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
1178
            return StateSuperTableOnly()
S
Shuduo Sang 已提交
1179
        else:  # has actual tables
1180
            logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
1181 1182 1183
            return StateHasData()

    def transition(self, tasks):
S
Shuduo Sang 已提交
1184
        if (len(tasks) == 0):  # before 1st step, or otherwise empty
1185
            logger.debug("[STT] Starting State: {}".format(self._curState))
S
Shuduo Sang 已提交
1186
            return  # do nothing
1187

S
Shuduo Sang 已提交
1188 1189
        # this should show up in the server log, separating steps
        self._dbConn.execute("show dnodes")
1190 1191 1192 1193

        # Generic Checks, first based on the start state
        if self._curState.canCreateDb():
            self._curState.assertIfExistThenSuccess(tasks, TaskCreateDb)
S
Shuduo Sang 已提交
1194 1195
            # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in
            # case of multiple creation and drops
1196 1197 1198

        if self._curState.canDropDb():
            self._curState.assertIfExistThenSuccess(tasks, TaskDropDb)
S
Shuduo Sang 已提交
1199 1200
            # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in
            # case of drop-create-drop
1201 1202 1203

        # if self._state.canCreateFixedTable():
            # self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped
S
Shuduo Sang 已提交
1204 1205
            # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not
            # really, in case of create-drop-create
1206 1207 1208

        # if self._state.canDropFixedTable():
            # self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped
S
Shuduo Sang 已提交
1209 1210
            # self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not
            # really in case of drop-create-drop
1211 1212

        # if self._state.canAddData():
S
Shuduo Sang 已提交
1213 1214
        # self.assertIfExistThenSuccess(tasks, AddFixedDataTask)  # not true
        # actually
1215 1216 1217 1218 1219 1220

        # if self._state.canReadData():
            # Nothing for sure

        newState = self._findCurrentState()
        logger.debug("[STT] New DB state determined: {}".format(newState))
S
Shuduo Sang 已提交
1221 1222
        # can old state move to new state through the tasks?
        self._curState.verifyTasksToState(tasks, newState)
1223 1224 1225
        self._curState = newState

    def pickTaskType(self):
S
Shuduo Sang 已提交
1226 1227
        # all the task types we can choose from at curent state
        taskTypes = self.getTaskTypes()
1228 1229 1230
        weights = []
        for tt in taskTypes:
            endState = tt.getEndState()
S
Shuduo Sang 已提交
1231 1232 1233
            if endState is not None:
                # TODO: change to a method
                weights.append(self._stateWeights[endState.getValIndex()])
1234
            else:
S
Shuduo Sang 已提交
1235 1236
                # read data task, default to 10: TODO: change to a constant
                weights.append(10)
1237
        i = self._weighted_choice_sub(weights)
S
Shuduo Sang 已提交
1238
        # logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))
1239 1240
        return taskTypes[i]

S
Shuduo Sang 已提交
1241 1242 1243 1244 1245
    # ref:
    # https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
    def _weighted_choice_sub(self, weights):
        # TODO: use our dice to ensure it being determinstic?
        rnd = random.random() * sum(weights)
1246 1247 1248 1249
        for i, w in enumerate(weights):
            rnd -= w
            if rnd < 0:
                return i
1250

1251
# Manager of the Database Data/Connection
S
Shuduo Sang 已提交
1252 1253 1254 1255


class DbManager():
    def __init__(self, resetDb=True):
S
Steven Li 已提交
1256
        self.tableNumQueue = LinearQueue()
S
Shuduo Sang 已提交
1257 1258 1259
        # datetime.datetime(2019, 1, 1) # initial date time tick
        self._lastTick = self.setupLastTick()
        self._lastInt = 0  # next one is initial integer
1260
        self._lock = threading.RLock()
S
Shuduo Sang 已提交
1261

1262
        # self.openDbServerConnection()
S
Shuduo Sang 已提交
1263 1264
        self._dbConn = DbConn.createNative() if (
            gConfig.connector_type == 'native') else DbConn.createRest()
1265
        try:
S
Shuduo Sang 已提交
1266
            self._dbConn.open()  # may throw taos.error.ProgrammingError: disconnected
1267 1268
        except taos.error.ProgrammingError as err:
            # print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
S
Shuduo Sang 已提交
1269 1270 1271
            if (err.msg == 'client disconnected'):  # cannot open DB connection
                print(
                    "Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
S
Steven Li 已提交
1272
                sys.exit(2)
1273
            else:
S
Shuduo Sang 已提交
1274 1275
                raise
        except BaseException:
S
Steven Li 已提交
1276
            print("[=] Unexpected exception")
S
Shuduo Sang 已提交
1277 1278 1279 1280
            raise

        if resetDb:
            self._dbConn.resetDb()  # drop and recreate DB
1281

S
Shuduo Sang 已提交
1282 1283
        # Do this after dbConn is in proper shape
        self._stateMachine = StateMechine(self._dbConn)
1284

1285 1286 1287
    def getDbConn(self):
        return self._dbConn

S
Shuduo Sang 已提交
1288
    def getStateMachine(self) -> StateMechine:
1289 1290 1291 1292
        return self._stateMachine

    # def getState(self):
    #     return self._stateMachine.getCurrentState()
1293 1294 1295 1296 1297 1298

    # We aim to create a starting time tick, such that, whenever we run our test here once
    # We should be able to safely create 100,000 records, which will not have any repeated time stamp
    # when we re-run the test in 3 minutes (180 seconds), basically we should expand time duration
    # by a factor of 500.
    # TODO: what if it goes beyond 10 years into the future
1299
    # TODO: fix the error as result of above: "tsdb timestamp is out of range"
1300
    def setupLastTick(self):
1301
        t1 = datetime.datetime(2020, 6, 1)
1302
        t2 = datetime.datetime.now()
S
Shuduo Sang 已提交
1303 1304 1305 1306
        # maybe a very large number, takes 69 years to exceed Python int range
        elSec = int(t2.timestamp() - t1.timestamp())
        elSec2 = (elSec % (8 * 12 * 30 * 24 * 60 * 60 / 500)) * \
            500  # a number representing seconds within 10 years
1307
        # print("elSec = {}".format(elSec))
S
Shuduo Sang 已提交
1308 1309 1310
        t3 = datetime.datetime(2012, 1, 1)  # default "keep" is 10 years
        t4 = datetime.datetime.fromtimestamp(
            t3.timestamp() + elSec2)  # see explanation above
1311 1312 1313
        logger.info("Setting up TICKS to start from: {}".format(t4))
        return t4

S
Shuduo Sang 已提交
1314
    def pickAndAllocateTable(self):  # pick any table, and "use" it
S
Steven Li 已提交
1315 1316
        return self.tableNumQueue.pickAndAllocate()

1317 1318 1319 1320 1321
    def addTable(self):
        with self._lock:
            tIndex = self.tableNumQueue.push()
        return tIndex

1322 1323
    @classmethod
    def getFixedSuperTableName(cls):
1324
        return "fs_table"
1325

1326 1327 1328 1329
    @classmethod
    def getFixedSuperTable(cls):
        return TdSuperTable(cls.getFixedSuperTableName())

S
Shuduo Sang 已提交
1330
    def releaseTable(self, i):  # return the table back, so others can use it
S
Steven Li 已提交
1331 1332
        self.tableNumQueue.release(i)

1333
    def getNextTick(self):
S
Shuduo Sang 已提交
1334 1335
        with self._lock:  # prevent duplicate tick
            if Dice.throw(10) == 0:  # 1 in 10 chance
S
Steven Li 已提交
1336
                return self._lastTick + datetime.timedelta(0, -100)
S
Shuduo Sang 已提交
1337 1338 1339
            else:  # regular
                # add one second to it
                self._lastTick += datetime.timedelta(0, 1)
S
Steven Li 已提交
1340
                return self._lastTick
1341 1342

    def getNextInt(self):
1343 1344 1345
        with self._lock:
            self._lastInt += 1
            return self._lastInt
1346 1347

    def getNextBinary(self):
S
Shuduo Sang 已提交
1348 1349
        return "Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_{}".format(
            self.getNextInt())
1350 1351

    def getNextFloat(self):
1352 1353 1354
        ret = 0.9 + self.getNextInt()
        # print("Float obtained: {}".format(ret))
        return ret
S
Shuduo Sang 已提交
1355

S
Steven Li 已提交
1356
    def getTableNameToDelete(self):
S
Shuduo Sang 已提交
1357 1358
        tblNum = self.tableNumQueue.pop()  # TODO: race condition!
        if (not tblNum):  # maybe false
1359
            return False
S
Shuduo Sang 已提交
1360

S
Steven Li 已提交
1361 1362
        return "table_{}".format(tblNum)

1363
    def cleanUp(self):
S
Shuduo Sang 已提交
1364 1365
        self._dbConn.close()

1366

1367
class TaskExecutor():
1368
    class BoundedList:
S
Shuduo Sang 已提交
1369
        def __init__(self, size=10):
1370 1371 1372
            self._size = size
            self._list = []

S
Shuduo Sang 已提交
1373 1374
        def add(self, n: int):
            if not self._list:  # empty
1375 1376 1377 1378 1379 1380 1381
                self._list.append(n)
                return
            # now we should insert
            nItems = len(self._list)
            insPos = 0
            for i in range(nItems):
                insPos = i
S
Shuduo Sang 已提交
1382 1383 1384
                if n <= self._list[i]:  # smaller than this item, time to insert
                    break  # found the insertion point
                insPos += 1  # insert to the right
1385

S
Shuduo Sang 已提交
1386 1387
            if insPos == 0:  # except for the 1st item, # TODO: elimiate first item as gating item
                return  # do nothing
1388 1389

            # print("Inserting at postion {}, value: {}".format(insPos, n))
S
Shuduo Sang 已提交
1390 1391
            self._list.insert(insPos, n)  # insert

1392
            newLen = len(self._list)
S
Shuduo Sang 已提交
1393 1394 1395 1396 1397
            if newLen <= self._size:
                return  # do nothing
            elif newLen == (self._size + 1):
                del self._list[0]  # remove the first item
            else:
1398 1399 1400 1401 1402 1403 1404
                raise RuntimeError("Corrupt Bounded List")

        def __str__(self):
            return repr(self._list)

    _boundedList = BoundedList()

1405 1406 1407
    def __init__(self, curStep):
        self._curStep = curStep

1408 1409 1410 1411
    @classmethod
    def getBoundedList(cls):
        return cls._boundedList

1412 1413 1414
    def getCurStep(self):
        return self._curStep

S
Shuduo Sang 已提交
1415
    def execute(self, task: Task, wt: WorkerThread):  # execute a task on a thread
1416
        task.execute(wt)
1417

1418 1419 1420 1421
    def recordDataMark(self, n: int):
        # print("[{}]".format(n), end="", flush=True)
        self._boundedList.add(n)

1422 1423
    # def logInfo(self, msg):
    #     logger.info("    T[{}.x]: ".format(self._curStep) + msg)
1424

1425 1426
    # def logDebug(self, msg):
    #     logger.debug("    T[{}.x]: ".format(self._curStep) + msg)
1427

S
Shuduo Sang 已提交
1428

S
Steven Li 已提交
1429
class Task():
1430 1431 1432 1433
    taskSn = 100

    @classmethod
    def allocTaskNum(cls):
S
Shuduo Sang 已提交
1434
        Task.taskSn += 1  # IMPORTANT: cannot use cls.taskSn, since each sub class will have a copy
S
Steven Li 已提交
1435 1436
        # logger.debug("Allocating taskSN: {}".format(Task.taskSn))
        return Task.taskSn
1437

S
Shuduo Sang 已提交
1438
    def __init__(self, dbManager: DbManager, execStats: ExecutionStats):
1439
        self._dbManager = dbManager
S
Shuduo Sang 已提交
1440
        self._workerThread = None
1441
        self._err = None
1442
        self._aborted = False
1443
        self._curStep = None
S
Shuduo Sang 已提交
1444
        self._numRows = None  # Number of rows affected
1445

S
Shuduo Sang 已提交
1446
        # Assign an incremental task serial number
1447
        self._taskNum = self.allocTaskNum()
S
Steven Li 已提交
1448
        # logger.debug("Creating new task {}...".format(self._taskNum))
1449

1450
        self._execStats = execStats
S
Shuduo Sang 已提交
1451
        self._lastSql = ""  # last SQL executed/attempted
1452

1453
    def isSuccess(self):
S
Shuduo Sang 已提交
1454
        return self._err is None
1455

1456 1457 1458
    def isAborted(self):
        return self._aborted

S
Shuduo Sang 已提交
1459
    def clone(self):  # TODO: why do we need this again?
1460
        newTask = self.__class__(self._dbManager, self._execStats)
1461 1462 1463
        return newTask

    def logDebug(self, msg):
S
Shuduo Sang 已提交
1464 1465 1466
        self._workerThread.logDebug(
            "Step[{}.{}] {}".format(
                self._curStep, self._taskNum, msg))
1467 1468

    def logInfo(self, msg):
S
Shuduo Sang 已提交
1469 1470 1471
        self._workerThread.logInfo(
            "Step[{}.{}] {}".format(
                self._curStep, self._taskNum, msg))
1472

1473
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
S
Shuduo Sang 已提交
1474 1475 1476
        raise RuntimeError(
            "To be implemeted by child classes, class name: {}".format(
                self.__class__.__name__))
1477

1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497
    def _isErrAcceptable(self, errno, msg):
        if errno in [
                0x05,  # TSDB_CODE_RPC_NOT_READY
                # 0x200, # invalid SQL, TODO: re-examine with TD-934
                0x360, 0x362, 
                0x369, # tag already exists
                0x36A, 0x36B, 0x36D,
                0x381, 
                0x380, # "db not selected"
                0x383,
                0x386,  # DB is being dropped?!
                0x503,
                0x510,  # vnode not in ready state
                0x600,
                1000  # REST catch-all error
            ]: 
            return True # These are the ALWAYS-ACCEPTABLE ones
        elif errno == 0x200 : # invalid SQL, we need to div in a bit more
            if msg.find("invalid column name") != -1:
                return True 
1498 1499 1500 1501
            elif msg.find("tags number not matched") != -1: # mismatched tags after modification
                return True
            elif msg.find("duplicated column names") != -1: # also alter table tag issues
                return True
1502 1503 1504 1505
        
        return False # Not an acceptable error


1506 1507
    def execute(self, wt: WorkerThread):
        wt.verifyThreadSelf()
S
Shuduo Sang 已提交
1508
        self._workerThread = wt  # type: ignore
1509 1510

        te = wt.getTaskExecutor()
1511
        self._curStep = te.getCurStep()
S
Shuduo Sang 已提交
1512 1513
        self.logDebug(
            "[-] executing task {}...".format(self.__class__.__name__))
1514 1515

        self._err = None
S
Shuduo Sang 已提交
1516 1517
        self._execStats.beginTaskType(
            self.__class__.__name__)  # mark beginning
1518
        try:
S
Shuduo Sang 已提交
1519
            self._executeInternal(te, wt)  # TODO: no return value?
1520
        except taos.error.ProgrammingError as err:
S
Shuduo Sang 已提交
1521 1522
            errno2 = err.errno if (
                err.errno > 0) else 0x80000000 + err.errno  # correct error scheme
1523
            if (gConfig.continue_on_exception):  # user choose to continue
1524
                self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format(
1525
                        errno2, err, self._lastSql))
1526
                self._err = err
1527 1528
            elif self._isErrAcceptable(errno2, err.__str__()):
                self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
S
Shuduo Sang 已提交
1529
                        errno2, err, self._lastSql))
1530
                print("_", end="", flush=True)
S
Shuduo Sang 已提交
1531
                self._err = err
1532
            else: # not an acceptable error
S
Shuduo Sang 已提交
1533 1534
                errMsg = "[=] Unexpected Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
                    errno2, err, self._lastSql)
1535
                self.logDebug(errMsg)
S
Shuduo Sang 已提交
1536
                if gConfig.debug:
1537 1538
                    # raise # so that we see full stack
                    traceback.print_exc()
1539 1540
                print(
                    "\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) +
1541 1542 1543 1544
                    "----------------------------\n")
                # sys.exit(-1)
                self._err = err
                self._aborted = True
S
Shuduo Sang 已提交
1545
        except Exception as e:
S
Steven Li 已提交
1546
            self.logInfo("Non-TAOS exception encountered")
S
Shuduo Sang 已提交
1547
            self._err = e
S
Steven Li 已提交
1548
            self._aborted = True
1549
            traceback.print_exc()
1550
        except BaseException as e:
1551
            self.logInfo("Python base exception encountered")
1552
            self._err = e
1553
            self._aborted = True
S
Steven Li 已提交
1554
            traceback.print_exc()
S
Shuduo Sang 已提交
1555 1556 1557 1558
        except BaseException:
            self.logDebug(
                "[=] Unexpected exception, SQL: {}".format(
                    self._lastSql))
1559
            raise
1560
        self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
S
Shuduo Sang 已提交
1561 1562 1563 1564 1565

        self.logDebug("[X] task execution completed, {}, status: {}".format(
            self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))
        # TODO: merge with above.
        self._execStats.incExecCount(self.__class__.__name__, self.isSuccess())
S
Steven Li 已提交
1566

1567
    def execSql(self, sql):
1568
        self._lastSql = sql
1569
        return self._dbManager.execute(sql)
1570

S
Shuduo Sang 已提交
1571
    def execWtSql(self, wt: WorkerThread, sql):  # execute an SQL on the worker thread
1572 1573 1574
        self._lastSql = sql
        return wt.execSql(sql)

S
Shuduo Sang 已提交
1575
    def queryWtSql(self, wt: WorkerThread, sql):  # execute an SQL on the worker thread
1576 1577 1578
        self._lastSql = sql
        return wt.querySql(sql)

S
Shuduo Sang 已提交
1579
    def getQueryResult(self, wt: WorkerThread):  # execute an SQL on the worker thread
1580 1581 1582
        return wt.getQueryResult()


1583
class ExecutionStats:
1584
    def __init__(self):
S
Shuduo Sang 已提交
1585 1586
        # total/success times for a task
        self._execTimes: Dict[str, [int, int]] = {}
1587 1588 1589
        self._tasksInProgress = 0
        self._lock = threading.Lock()
        self._firstTaskStartTime = None
1590
        self._execStartTime = None
S
Shuduo Sang 已提交
1591 1592
        self._elapsedTime = 0.0  # total elapsed time
        self._accRunTime = 0.0  # accumulated run time
1593

1594 1595 1596
        self._failed = False
        self._failureReason = None

S
Steven Li 已提交
1597
    def __str__(self):
S
Shuduo Sang 已提交
1598 1599
        return "[ExecStats: _failed={}, _failureReason={}".format(
            self._failed, self._failureReason)
S
Steven Li 已提交
1600 1601

    def isFailed(self):
S
Shuduo Sang 已提交
1602
        return self._failed
S
Steven Li 已提交
1603

1604 1605 1606 1607 1608 1609
    def startExec(self):
        self._execStartTime = time.time()

    def endExec(self):
        self._elapsedTime = time.time() - self._execStartTime

S
Shuduo Sang 已提交
1610
    def incExecCount(self, klassName, isSuccess):  # TODO: add a lock here
1611 1612
        if klassName not in self._execTimes:
            self._execTimes[klassName] = [0, 0]
S
Shuduo Sang 已提交
1613 1614
        t = self._execTimes[klassName]  # tuple for the data
        t[0] += 1  # index 0 has the "total" execution times
1615
        if isSuccess:
S
Shuduo Sang 已提交
1616
            t[1] += 1  # index 1 has the "success" execution times
1617 1618 1619

    def beginTaskType(self, klassName):
        with self._lock:
S
Shuduo Sang 已提交
1620 1621
            if self._tasksInProgress == 0:  # starting a new round
                self._firstTaskStartTime = time.time()  # I am now the first task
1622 1623 1624 1625 1626
            self._tasksInProgress += 1

    def endTaskType(self, klassName, isSuccess):
        with self._lock:
            self._tasksInProgress -= 1
S
Shuduo Sang 已提交
1627
            if self._tasksInProgress == 0:  # all tasks have stopped
1628 1629 1630
                self._accRunTime += (time.time() - self._firstTaskStartTime)
                self._firstTaskStartTime = None

1631 1632 1633 1634
    def registerFailure(self, reason):
        self._failed = True
        self._failureReason = reason

1635
    def printStats(self):
S
Shuduo Sang 已提交
1636 1637 1638 1639 1640 1641
        logger.info(
            "----------------------------------------------------------------------")
        logger.info(
            "| Crash_Gen test {}, with the following stats:". format(
                "FAILED (reason: {})".format(
                    self._failureReason) if self._failed else "SUCCEEDED"))
1642
        logger.info("| Task Execution Times (success/total):")
1643
        execTimesAny = 0
S
Shuduo Sang 已提交
1644
        for k, n in self._execTimes.items():
1645
            execTimesAny += n[0]
S
Shuduo Sang 已提交
1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665
            logger.info("|    {0:<24}: {1}/{2}".format(k, n[1], n[0]))

        logger.info(
            "| Total Tasks Executed (success or not): {} ".format(execTimesAny))
        logger.info(
            "| Total Tasks In Progress at End: {}".format(
                self._tasksInProgress))
        logger.info(
            "| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(
                self._accRunTime))
        logger.info(
            "| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime / execTimesAny))
        logger.info(
            "| Total Elapsed Time (from wall clock): {:.3f} seconds".format(
                self._elapsedTime))
        logger.info(
            "| Top numbers written: {}".format(
                TaskExecutor.getBoundedList()))
        logger.info(
            "----------------------------------------------------------------------")
1666 1667 1668


class StateTransitionTask(Task):
1669 1670 1671 1672 1673
    LARGE_NUMBER_OF_TABLES = 35
    SMALL_NUMBER_OF_TABLES = 3
    LARGE_NUMBER_OF_RECORDS = 50
    SMALL_NUMBER_OF_RECORDS = 3

1674
    @classmethod
S
Shuduo Sang 已提交
1675
    def getInfo(cls):  # each sub class should supply their own information
1676 1677
        raise RuntimeError("Overriding method expected")

S
Shuduo Sang 已提交
1678
    _endState = None
1679
    @classmethod
S
Shuduo Sang 已提交
1680
    def getEndState(cls):  # TODO: optimize by calling it fewer times
1681 1682
        raise RuntimeError("Overriding method expected")

1683 1684 1685
    # @classmethod
    # def getBeginStates(cls):
    #     return cls.getInfo()[0]
1686

1687 1688 1689
    # @classmethod
    # def getEndState(cls): # returning the class name
    #     return cls.getInfo()[0]
1690 1691

    @classmethod
1692 1693 1694
    def canBeginFrom(cls, state: AnyState):
        # return state.getValue() in cls.getBeginStates()
        raise RuntimeError("must be overriden")
1695

1696 1697
    @classmethod
    def getRegTableName(cls, i):
1698
        return "reg_table_{}".format(i)
1699

1700 1701
    def execute(self, wt: WorkerThread):
        super().execute(wt)
S
Shuduo Sang 已提交
1702 1703


1704
class TaskCreateDb(StateTransitionTask):
1705
    @classmethod
1706
    def getEndState(cls):
S
Shuduo Sang 已提交
1707
        return StateDbOnly()
1708

1709 1710 1711 1712
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canCreateDb()

1713
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
S
Shuduo Sang 已提交
1714 1715
        self.execWtSql(wt, "create database db")

1716

1717
class TaskDropDb(StateTransitionTask):
1718
    @classmethod
1719 1720
    def getEndState(cls):
        return StateEmpty()
1721

1722 1723 1724 1725
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canDropDb()

1726
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1727
        self.execWtSql(wt, "drop database db")
S
Steven Li 已提交
1728
        logger.debug("[OPS] database dropped at {}".format(time.time()))
1729

S
Shuduo Sang 已提交
1730

1731
class TaskCreateSuperTable(StateTransitionTask):
1732
    @classmethod
1733 1734
    def getEndState(cls):
        return StateSuperTableOnly()
1735

1736 1737
    @classmethod
    def canBeginFrom(cls, state: AnyState):
1738
        return state.canCreateFixedSuperTable()
1739

1740
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
S
Shuduo Sang 已提交
1741
        if not wt.dbInUse():  # no DB yet, to the best of our knowledge
1742 1743 1744
            logger.debug("Skipping task, no DB yet")
            return

S
Shuduo Sang 已提交
1745
        tblName = self._dbManager.getFixedSuperTableName()
1746
        # wt.execSql("use db")    # should always be in place
S
Shuduo Sang 已提交
1747 1748 1749 1750 1751
        self.execWtSql(
            wt,
            "create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
        # No need to create the regular tables, INSERT will do that
        # automatically
1752

S
Steven Li 已提交
1753

1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781
class TdSuperTable:
    def __init__(self, stName):
        self._stName = stName

    def getRegTables(self, dbc: DbConn):
        try:
            dbc.query("select TBNAME from db.{}".format(self._stName))  # TODO: analyze result set later            
        except taos.error.ProgrammingError as err:                    
            errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno
            logger.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err))
            raise

        qr = dbc.getQueryResult()
        return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation

    def hasRegTables(self, dbc: DbConn):
        return dbc.query("SELECT * FROM db.{}".format(self._stName)) > 0

    def ensureTable(self, dbc: DbConn, regTableName: str):
        sql = "select tbname from {} where tbname in ('{}')".format(self._stName, regTableName)
        if dbc.query(sql) >= 1 : # reg table exists already
            return
        sql = "CREATE TABLE {} USING {} tags ('{}', {})".format(
            regTableName, self._stName,
            'xyz', '33'
        )
        dbc.execute(sql)

1782
class TaskReadData(StateTransitionTask):
1783
    @classmethod
1784
    def getEndState(cls):
S
Shuduo Sang 已提交
1785
        return None  # meaning doesn't affect state
1786

1787 1788 1789 1790
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canReadData()

1791
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1792
        sTable = self._dbManager.getFixedSuperTable()
1793

S
Shuduo Sang 已提交
1794 1795
        if random.randrange(
                5) == 0:  # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations
1796 1797
            wt.getDbConn().close()
            wt.getDbConn().open()
1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809
        
        for rTbName in sTable.getRegTables(self._dbManager.getDbConn()):  # regular tables
            aggExpr = Dice.choice(['*', 'count(*)', 'avg(speed)', 
                # 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
                'sum(speed)', 'stddev(speed)', 
                'min(speed)', 'max(speed)', 'first(speed)', 'last(speed)']) # TODO: add more from 'top'
            try:
                self.execWtSql(wt, "select {} from db.{}".format(aggExpr, rTbName))
            except taos.error.ProgrammingError as err:                    
                errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno
                logger.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql))
                raise
S
Shuduo Sang 已提交
1810

1811
class TaskDropSuperTable(StateTransitionTask):
1812
    @classmethod
1813
    def getEndState(cls):
S
Shuduo Sang 已提交
1814
        return StateDbOnly()
1815

1816 1817
    @classmethod
    def canBeginFrom(cls, state: AnyState):
1818
        return state.canDropFixedSuperTable()
1819

1820
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
S
Shuduo Sang 已提交
1821 1822 1823 1824 1825 1826 1827
        # 1/2 chance, we'll drop the regular tables one by one, in a randomized
        # sequence
        if Dice.throw(2) == 0:
            tblSeq = list(range(
                2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)))
            random.shuffle(tblSeq)
            tickOutput = False  # if we have spitted out a "d" character for "drop regular table"
1828
            isSuccess = True
S
Shuduo Sang 已提交
1829 1830 1831
            for i in tblSeq:
                regTableName = self.getRegTableName(
                    i)  # "db.reg_table_{}".format(i)
1832
                try:
S
Shuduo Sang 已提交
1833 1834 1835 1836 1837 1838 1839
                    self.execWtSql(wt, "drop table {}".format(
                        regTableName))  # nRows always 0, like MySQL
                except taos.error.ProgrammingError as err:
                    # correcting for strange error number scheme
                    errno2 = err.errno if (
                        err.errno > 0) else 0x80000000 + err.errno
                    if (errno2 in [0x362]):  # mnode invalid table name
1840
                        isSuccess = False
S
Shuduo Sang 已提交
1841 1842 1843
                        logger.debug(
                            "[DB] Acceptable error when dropping a table")
                    continue  # try to delete next regular table
1844 1845

                if (not tickOutput):
S
Shuduo Sang 已提交
1846 1847
                    tickOutput = True  # Print only one time
                    if isSuccess:
1848 1849
                        print("d", end="", flush=True)
                    else:
S
Shuduo Sang 已提交
1850
                        print("f", end="", flush=True)
1851 1852

        # Drop the super table itself
S
Shuduo Sang 已提交
1853
        tblName = self._dbManager.getFixedSuperTableName()
1854
        self.execWtSql(wt, "drop table db.{}".format(tblName))
1855

S
Shuduo Sang 已提交
1856

1857 1858 1859
class TaskAlterTags(StateTransitionTask):
    @classmethod
    def getEndState(cls):
S
Shuduo Sang 已提交
1860
        return None  # meaning doesn't affect state
1861 1862 1863

    @classmethod
    def canBeginFrom(cls, state: AnyState):
S
Shuduo Sang 已提交
1864
        return state.canDropFixedSuperTable()  # if we can drop it, we can alter tags
1865 1866

    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
S
Shuduo Sang 已提交
1867
        tblName = self._dbManager.getFixedSuperTableName()
1868
        dice = Dice.throw(4)
S
Shuduo Sang 已提交
1869
        if dice == 0:
1870
            sql = "alter table db.{} add tag extraTag int".format(tblName)
S
Shuduo Sang 已提交
1871
        elif dice == 1:
1872
            sql = "alter table db.{} drop tag extraTag".format(tblName)
S
Shuduo Sang 已提交
1873
        elif dice == 2:
1874
            sql = "alter table db.{} drop tag newTag".format(tblName)
S
Shuduo Sang 已提交
1875 1876 1877
        else:  # dice == 3
            sql = "alter table db.{} change tag extraTag newTag".format(
                tblName)
1878 1879

        self.execWtSql(wt, sql)
1880

S
Shuduo Sang 已提交
1881

1882
class TaskAddData(StateTransitionTask):
S
Shuduo Sang 已提交
1883 1884
    # Track which table is being actively worked on
    activeTable: Set[int] = set()
1885

S
Shuduo Sang 已提交
1886 1887
    # We use these two files to record operations to DB, useful for power-off
    # tests
1888 1889 1890 1891 1892
    fAddLogReady = None
    fAddLogDone = None

    @classmethod
    def prepToRecordOps(cls):
S
Shuduo Sang 已提交
1893 1894 1895 1896
        if gConfig.record_ops:
            if (cls.fAddLogReady is None):
                logger.info(
                    "Recording in a file operations to be performed...")
1897
                cls.fAddLogReady = open("add_log_ready.txt", "w")
S
Shuduo Sang 已提交
1898
            if (cls.fAddLogDone is None):
1899 1900
                logger.info("Recording in a file operations completed...")
                cls.fAddLogDone = open("add_log_done.txt", "w")
1901

1902
    @classmethod
1903 1904
    def getEndState(cls):
        return StateHasData()
1905 1906 1907 1908

    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canAddData()
S
Shuduo Sang 已提交
1909

1910
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1911
        ds = self._dbManager
1912
        tblSeq = list(range(
S
Shuduo Sang 已提交
1913 1914 1915 1916
                self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))
        random.shuffle(tblSeq)
        for i in tblSeq:
            if (i in self.activeTable):  # wow already active
1917
                print("x", end="", flush=True) # concurrent insertion
1918
            else:
S
Shuduo Sang 已提交
1919
                self.activeTable.add(i)  # marking it active
1920 1921 1922 1923 1924 1925
            
            sTable = ds.getFixedSuperTable()
            regTableName = self.getRegTableName(i)  # "db.reg_table_{}".format(i)
            sTable.ensureTable(ds.getDbConn(), regTableName)  # Ensure the table exists           
           
            for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS):  # number of records per table
S
Shuduo Sang 已提交
1926
                nextInt = ds.getNextInt()
1927 1928
                if gConfig.record_ops:
                    self.prepToRecordOps()
1929
                    self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
1930 1931
                    self.fAddLogReady.flush()
                    os.fsync(self.fAddLogReady)
1932
                sql = "insert into {} values ('{}', {});".format( # removed: tags ('{}', {})
S
Shuduo Sang 已提交
1933
                    regTableName,
1934 1935
                    # ds.getFixedSuperTableName(),
                    # ds.getNextBinary(), ds.getNextFloat(),
1936
                    ds.getNextTick(), nextInt)
S
Shuduo Sang 已提交
1937 1938 1939
                self.execWtSql(wt, sql)
                # Successfully wrote the data into the DB, let's record it
                # somehow
1940
                te.recordDataMark(nextInt)
1941
                if gConfig.record_ops:
S
Shuduo Sang 已提交
1942 1943 1944
                    self.fAddLogDone.write(
                        "Wrote {} to {}\n".format(
                            nextInt, regTableName))
1945 1946
                    self.fAddLogDone.flush()
                    os.fsync(self.fAddLogDone)
S
Shuduo Sang 已提交
1947
            self.activeTable.discard(i)  # not raising an error, unlike remove
1948 1949


S
Steven Li 已提交
1950 1951
# Deterministic random number generator
class Dice():
S
Shuduo Sang 已提交
1952
    seeded = False  # static, uninitialized
S
Steven Li 已提交
1953 1954

    @classmethod
S
Shuduo Sang 已提交
1955
    def seed(cls, s):  # static
S
Steven Li 已提交
1956
        if (cls.seeded):
S
Shuduo Sang 已提交
1957 1958
            raise RuntimeError(
                "Cannot seed the random generator more than once")
S
Steven Li 已提交
1959 1960 1961 1962 1963
        cls.verifyRNG()
        random.seed(s)
        cls.seeded = True  # TODO: protect against multi-threading

    @classmethod
S
Shuduo Sang 已提交
1964
    def verifyRNG(cls):  # Verify that the RNG is determinstic
S
Steven Li 已提交
1965 1966 1967 1968
        random.seed(0)
        x1 = random.randrange(0, 1000)
        x2 = random.randrange(0, 1000)
        x3 = random.randrange(0, 1000)
S
Shuduo Sang 已提交
1969
        if (x1 != 864 or x2 != 394 or x3 != 776):
S
Steven Li 已提交
1970 1971 1972
            raise RuntimeError("System RNG is not deterministic")

    @classmethod
S
Shuduo Sang 已提交
1973
    def throw(cls, stop):  # get 0 to stop-1
1974
        return cls.throwRange(0, stop)
S
Steven Li 已提交
1975 1976

    @classmethod
S
Shuduo Sang 已提交
1977 1978
    def throwRange(cls, start, stop):  # up to stop-1
        if (not cls.seeded):
S
Steven Li 已提交
1979
            raise RuntimeError("Cannot throw dice before seeding it")
1980
        return random.randrange(start, stop)
S
Steven Li 已提交
1981

1982 1983 1984 1985
    @classmethod
    def choice(cls, cList):
        return random.choice(cList)

S
Steven Li 已提交
1986

S
Steven Li 已提交
1987 1988
class LoggingFilter(logging.Filter):
    def filter(self, record: logging.LogRecord):
S
Shuduo Sang 已提交
1989 1990
        if (record.levelno >= logging.INFO):
            return True  # info or above always log
S
Steven Li 已提交
1991

S
Steven Li 已提交
1992 1993 1994 1995
        # Commenting out below to adjust...

        # if msg.startswith("[TRD]"):
        #     return False
S
Steven Li 已提交
1996 1997
        return True

S
Shuduo Sang 已提交
1998 1999

class MyLoggingAdapter(logging.LoggerAdapter):
2000 2001 2002 2003
    def process(self, msg, kwargs):
        return "[{}]{}".format(threading.get_ident() % 10000, msg), kwargs
        # return '[%s] %s' % (self.extra['connid'], msg), kwargs

S
Shuduo Sang 已提交
2004 2005

class SvcManager:
2006
    def __init__(self):
2007
        print("Starting TDengine Service Manager")
2008 2009 2010
        # signal.signal(signal.SIGTERM, self.sigIntHandler) # Moved to MainExec
        # signal.signal(signal.SIGINT, self.sigIntHandler)
        # signal.signal(signal.SIGUSR1, self.sigUsrHandler)  # different handler!
2011

2012
        self.inSigHandler = False
2013 2014
        # self._status = MainExec.STATUS_RUNNING # set inside
        # _startTaosService()
2015
        self.svcMgrThread = None
2016

2017 2018 2019 2020 2021 2022 2023 2024
    def _doMenu(self):
        choice = ""
        while True:
            print("\nInterrupting Service Program, Choose an Action: ")
            print("1: Resume")
            print("2: Terminate")
            print("3: Restart")
            # Remember to update the if range below
S
Shuduo Sang 已提交
2025
            # print("Enter Choice: ", end="", flush=True)
2026 2027 2028
            while choice == "":
                choice = input("Enter Choice: ")
                if choice != "":
S
Shuduo Sang 已提交
2029 2030 2031
                    break  # done with reading repeated input
            if choice in ["1", "2", "3"]:
                break  # we are done with whole method
2032
            print("Invalid choice, please try again.")
S
Shuduo Sang 已提交
2033
            choice = ""  # reset
2034 2035
        return choice

S
Shuduo Sang 已提交
2036
    def sigUsrHandler(self, signalNumber, frame):
2037
        print("Interrupting main thread execution upon SIGUSR1")
2038
        if self.inSigHandler:  # already
2039
            print("Ignoring repeated SIG...")
S
Shuduo Sang 已提交
2040
            return  # do nothing if it's already not running
2041
        self.inSigHandler = True
2042 2043

        choice = self._doMenu()
S
Shuduo Sang 已提交
2044 2045 2046 2047 2048
        if choice == "1":
            # TODO: can the sub-process be blocked due to us not reading from
            # queue?
            self.sigHandlerResume()
        elif choice == "2":
2049
            self.stopTaosService()
S
Shuduo Sang 已提交
2050
        elif choice == "3":
2051 2052
            self.stopTaosService()
            self.startTaosService()
2053 2054
        else:
            raise RuntimeError("Invalid menu choice: {}".format(choice))
2055

2056 2057
        self.inSigHandler = False

2058
    def sigIntHandler(self, signalNumber, frame):
2059
        print("SvcManager: INT Signal Handler starting...")
2060
        if self.inSigHandler:
2061 2062
            print("Ignoring repeated SIG_INT...")
            return
2063
        self.inSigHandler = True
2064

S
Shuduo Sang 已提交
2065
        self.stopTaosService()
2066
        print("SvcManager: INT Signal Handler returning...")
2067
        self.inSigHandler = False
2068

S
Shuduo Sang 已提交
2069
    def sigHandlerResume(self):
2070
        print("Resuming TDengine service manager thread (main thread)...\n\n")
2071

2072
    def _checkServiceManagerThread(self):
2073 2074 2075 2076
        if self.svcMgrThread:  # valid svc mgr thread
            if self.svcMgrThread.isStopped():  # done?
                self.svcMgrThread.procIpcBatch()  # one last time. TODO: appropriate?
                self.svcMgrThread = None  # no more
2077 2078

    def _procIpcAll(self):
2079 2080 2081 2082 2083 2084 2085 2086
        while self.svcMgrThread:  # for as long as the svc mgr thread is still here
            self.svcMgrThread.procIpcBatch()  # regular processing,
            time.sleep(0.5)  # pause, before next round
            self._checkServiceManagerThread()
        print(
            "Service Manager Thread (with subprocess) has ended, main thread now exiting...")

    def startTaosService(self):
2087
        if self.svcMgrThread:
2088 2089 2090 2091 2092 2093 2094 2095 2096 2097
            raise RuntimeError("Cannot start TAOS service when one may already be running")

        # Find if there's already a taosd service, and then kill it
        for proc in psutil.process_iter():
            if proc.name() == 'taosd':
                print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupe")
                time.sleep(2.0)
                proc.kill()
            # print("Process: {}".format(proc.name()))
           
2098
        self.svcMgrThread = ServiceManagerThread()  # create the object
2099 2100
        self.svcMgrThread.start()
        print("TAOS service started, printing out output...")
2101 2102 2103
        self.svcMgrThread.procIpcBatch(
            trimToTarget=10,
            forceOutput=True)  # for printing 10 lines
2104
        print("TAOS service started")
2105 2106

    def stopTaosService(self, outputLines=20):
2107 2108 2109 2110
        if not self.isRunning():
            logger.warning("Cannot stop TAOS service, not running")
            return

2111 2112 2113
        print("Terminating Service Manager Thread (SMT) execution...")
        self.svcMgrThread.stop()
        if self.svcMgrThread.isStopped():
2114 2115
            self.svcMgrThread.procIpcBatch(outputLines)  # one last time
            self.svcMgrThread = None
2116 2117 2118 2119 2120 2121 2122
            print("----- End of TDengine Service Output -----\n")
            print("SMT execution terminated")
        else:
            print("WARNING: SMT did not terminate as expected")

    def run(self):
        self.startTaosService()
2123
        self._procIpcAll()  # pump/process all the messages
2124
        if self.isRunning():  # if sig handler hasn't destroyed it by now
2125 2126
            self.stopTaosService()  # should have started already

2127 2128
    def isRunning(self):
        return self.svcMgrThread != None
2129

2130 2131 2132 2133 2134
class ServiceManagerThread:
    MAX_QUEUE_SIZE = 10000

    def __init__(self):
        self._tdeSubProcess = None
2135
        self._thread = None
2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152
        self._status = None

    def getStatus(self):
        return self._status

    def isRunning(self):
        # return self._thread and self._thread.is_alive()
        return self._status == MainExec.STATUS_RUNNING

    def isStopping(self):
        return self._status == MainExec.STATUS_STOPPING

    def isStopped(self):
        return self._status == MainExec.STATUS_STOPPED

    # Start the thread (with sub process), and wait for the sub service
    # to become fully operational
2153 2154
    def start(self):
        if self._thread:
2155
            raise RuntimeError("Unexpected _thread")
2156
        if self._tdeSubProcess:
2157 2158 2159 2160
            raise RuntimeError("TDengine sub process already created/running")

        self._status = MainExec.STATUS_STARTING

2161
        self._tdeSubProcess = TdeSubProcess()
2162 2163 2164 2165
        self._tdeSubProcess.start()

        self._ipcQueue = Queue()
        self._thread = threading.Thread(
2166
            target=self.svcOutputReader,
2167
            args=(self._tdeSubProcess.getStdOut(), self._ipcQueue))
2168
        self._thread.daemon = True  # thread dies with the program
2169 2170 2171
        self._thread.start()

        # wait for service to start
2172
        for i in range(0, 10):
2173 2174 2175
            time.sleep(1.0)
            # self.procIpcBatch() # don't pump message during start up
            print("_zz_", end="", flush=True)
2176
            if self._status == MainExec.STATUS_RUNNING:
2177
                logger.info("[] TDengine service READY to process requests")
2178 2179
                return  # now we've started
        # TODO: handle this better?
2180
        self.procIpcBatch(20, True) # display output before cronking out, trim to last 20 msgs, force output
2181
        raise RuntimeError("TDengine service did not start successfully")
2182 2183 2184 2185 2186 2187

    def stop(self):
        # can be called from both main thread or signal handler
        print("Terminating TDengine service running as the sub process...")
        if self.isStopped():
            print("Service already stopped")
2188
            return
2189 2190 2191
        if self.isStopping():
            print("Service is already being stopped")
            return
2192 2193 2194 2195
        # Linux will send Control-C generated SIGINT to the TDengine process
        # already, ref:
        # https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
        if not self._tdeSubProcess:
2196
            raise RuntimeError("sub process object missing")
2197

2198 2199 2200
        self._status = MainExec.STATUS_STOPPING
        self._tdeSubProcess.stop()

2201 2202 2203 2204
        if self._tdeSubProcess.isRunning():  # still running
            print(
                "FAILED to stop sub process, it is still running... pid = {}".format(
                    self.subProcess.pid))
2205
        else:
2206 2207 2208
            self._tdeSubProcess = None  # not running any more
            self.join()  # stop the thread, change the status, etc.

2209 2210 2211
    def join(self):
        # TODO: sanity check
        if not self.isStopping():
2212 2213 2214
            raise RuntimeError(
                "Unexpected status when ending svc mgr thread: {}".format(
                    self._status))
2215

2216
        if self._thread:
2217
            self._thread.join()
2218
            self._thread = None
2219
            self._status = MainExec.STATUS_STOPPED
S
Shuduo Sang 已提交
2220
        else:
2221
            print("Joining empty thread, doing nothing")
2222 2223 2224

    def _trimQueue(self, targetSize):
        if targetSize <= 0:
2225
            return  # do nothing
2226
        q = self._ipcQueue
2227
        if (q.qsize() <= targetSize):  # no need to trim
2228 2229 2230 2231
            return

        logger.debug("Triming IPC queue to target size: {}".format(targetSize))
        itemsToTrim = q.qsize() - targetSize
2232
        for i in range(0, itemsToTrim):
2233 2234 2235
            try:
                q.get_nowait()
            except Empty:
2236 2237
                break  # break out of for loop, no more trimming

2238
    TD_READY_MSG = "TDengine is initialized successfully"
S
Shuduo Sang 已提交
2239

2240 2241
    def procIpcBatch(self, trimToTarget=0, forceOutput=False):
        self._trimQueue(trimToTarget)  # trim if necessary
S
Shuduo Sang 已提交
2242 2243
        # Process all the output generated by the underlying sub process,
        # managed by IO thread
2244
        print("<", end="", flush=True)
S
Shuduo Sang 已提交
2245 2246
        while True:
            try:
2247
                line = self._ipcQueue.get_nowait()  # getting output at fast speed
2248
                self._printProgress("_o")
2249 2250 2251
            except Empty:
                # time.sleep(2.3) # wait only if there's no output
                # no more output
2252
                print(".>", end="", flush=True)
S
Shuduo Sang 已提交
2253
                return  # we are done with THIS BATCH
2254
            else:  # got line, printing out
2255 2256 2257 2258 2259 2260 2261
                if forceOutput:
                    logger.info(line)
                else:
                    logger.debug(line)
        print(">", end="", flush=True)

    _ProgressBars = ["--", "//", "||", "\\\\"]
2262

2263 2264
    def _printProgress(self, msg):  # TODO: assuming 2 chars
        print(msg, end="", flush=True)
2265 2266 2267
        pBar = self._ProgressBars[Dice.throw(4)]
        print(pBar, end="", flush=True)
        print('\b\b\b\b', end="", flush=True)
2268

2269 2270 2271
    def svcOutputReader(self, out: IO, queue):
        # Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
        # print("This is the svcOutput Reader...")
2272
        # for line in out :
2273 2274 2275 2276
        for line in iter(out.readline, b''):
            # print("Finished reading a line: {}".format(line))
            # print("Adding item to queue...")
            line = line.decode("utf-8").rstrip()
2277 2278
            # This might block, and then causing "out" buffer to block
            queue.put(line)
2279 2280
            self._printProgress("_i")

2281 2282 2283
            if self._status == MainExec.STATUS_STARTING:  # we are starting, let's see if we have started
                if line.find(self.TD_READY_MSG) != -1:  # found
                    self._status = MainExec.STATUS_RUNNING
2284 2285

            # Trim the queue if necessary: TODO: try this 1 out of 10 times
2286
            self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10)  # trim to 90% size
2287

2288 2289 2290
            if self.isStopping():  # TODO: use thread status instead
                # WAITING for stopping sub process to finish its outptu
                print("_w", end="", flush=True)
2291 2292

            # queue.put(line)
2293 2294
        # meaning sub process must have died
        print("\nNo more output from IO thread managing TDengine service")
2295 2296
        out.close()

2297 2298

class TdeSubProcess:
2299 2300 2301 2302 2303
    def __init__(self):
        self.subProcess = None

    def getStdOut(self):
        return self.subProcess.stdout
2304

2305
    def isRunning(self):
2306
        return self.subProcess is not None
2307

S
Shuduo Sang 已提交
2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321
    def getBuildPath(self):
        selfPath = os.path.dirname(os.path.realpath(__file__))
        if ("community" in selfPath):
            projPath = selfPath[:selfPath.find("communit")]
        else:
            projPath = selfPath[:selfPath.find("tests")]

        for root, dirs, files in os.walk(projPath):
            if ("taosd" in files):
                rootRealPath = os.path.dirname(os.path.realpath(root))
                if ("packaging" not in rootRealPath):
                    buildPath = root[:len(root) - len("/build/bin")]
                    break
        return buildPath
2322

2323
    def start(self):
2324
        ON_POSIX = 'posix' in sys.builtin_module_names
S
Shuduo Sang 已提交
2325

2326 2327 2328
        taosdPath = self.getBuildPath() + "/build/bin/taosd"
        cfgPath = self.getBuildPath() + "/test/cfg"

2329 2330 2331 2332 2333 2334 2335 2336 2337
        # Delete the log files
        logPath = self.getBuildPath() + "/test/log"
        # ref: https://stackoverflow.com/questions/1995373/deleting-all-files-in-a-directory-with-python/1995397
        filelist = [ f for f in os.listdir(logPath) ] # if f.endswith(".bak") ]
        for f in filelist:
            filePath = os.path.join(logPath, f)
            print("Removing log file: {}".format(filePath))
            os.remove(filePath)

S
Shuduo Sang 已提交
2338
        svcCmd = [taosdPath, '-c', cfgPath]
2339
        # svcCmd = ['vmstat', '1']
S
Shuduo Sang 已提交
2340
        if self.subProcess:  # already there
2341 2342 2343
            raise RuntimeError("Corrupt process state")

        self.subProcess = subprocess.Popen(
S
Shuduo Sang 已提交
2344 2345
            svcCmd,
            stdout=subprocess.PIPE,
2346
            # bufsize=1, # not supported in binary mode
S
Shuduo Sang 已提交
2347
            close_fds=ON_POSIX)  # had text=True, which interferred with reading EOF
2348

2349 2350 2351
    def stop(self):
        if not self.subProcess:
            print("Sub process already stopped")
2352 2353 2354
            return

        retCode = self.subProcess.poll()
S
Shuduo Sang 已提交
2355
        if retCode:  # valid return code, process ended
2356
            self.subProcess = None
S
Shuduo Sang 已提交
2357 2358
        else:  # process still alive, let's interrupt it
            print(
2359
                "Sub process is running, sending SIG_INT and waiting for it to terminate...")
S
Shuduo Sang 已提交
2360 2361 2362 2363
            # sub process should end, then IPC queue should end, causing IO
            # thread to end
            self.subProcess.send_signal(signal.SIGINT)
            try:
2364 2365 2366 2367
                self.subProcess.wait(10)
            except subprocess.TimeoutExpired as err:
                print("Time out waiting for TDengine service process to exit")
            else:
2368
                print("TDengine service process terminated successfully from SIG_INT")
2369 2370
                self.subProcess = None

2371 2372 2373 2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398
class ThreadStacks: # stack info for all threads
    def __init__(self):
        self._allStacks = {}
        allFrames = sys._current_frames()
        for th in threading.enumerate():                        
            stack = traceback.extract_stack(allFrames[th.ident])     
            self._allStacks[th.native_id] = stack

    def print(self, filteredEndName = None, filterInternal = False):
        for thNid, stack in self._allStacks.items(): # for each thread            
            lastFrame = stack[-1]
            if filteredEndName: # we need to filter out stacks that match this name                
                if lastFrame.name == filteredEndName : # end did not match
                    continue
            if filterInternal:
                if lastFrame.name in ['wait', 'invoke_excepthook', 
                    '_wait', # The Barrier exception
                    'svcOutputReader', # the svcMgr thread
                    '__init__']: # the thread that extracted the stack
                    continue # ignore
            # Now print
            print("\n<----- Thread Info for ID: {}".format(thNid))
            for frame in stack:
                # print(frame)
                print("File {filename}, line {lineno}, in {name}".format(
                    filename=frame.filename, lineno=frame.lineno, name=frame.name))
                print("    {}".format(frame.line))
            print("-----> End of Thread Info\n")
S
Shuduo Sang 已提交
2399

2400 2401 2402
class ClientManager:
    def __init__(self):
        print("Starting service manager")
2403 2404
        # signal.signal(signal.SIGTERM, self.sigIntHandler)
        # signal.signal(signal.SIGINT, self.sigIntHandler)
2405

2406
        self._status = MainExec.STATUS_RUNNING
2407 2408
        self.tc = None

2409 2410
        self.inSigHandler = False

2411
    def sigIntHandler(self, signalNumber, frame):
2412
        if self._status != MainExec.STATUS_RUNNING:
2413 2414 2415
            print("Repeated SIGINT received, forced exit...")
            # return  # do nothing if it's already not running
            sys.exit(-1)
2416
        self._status = MainExec.STATUS_STOPPING  # immediately set our status
2417

2418
        print("ClientManager: Terminating program...")
2419 2420
        self.tc.requestToStop()

2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461
    def _doMenu(self):
        choice = ""
        while True:
            print("\nInterrupting Client Program, Choose an Action: ")
            print("1: Resume")
            print("2: Terminate")
            print("3: Show Threads")
            # Remember to update the if range below
            # print("Enter Choice: ", end="", flush=True)
            while choice == "":
                choice = input("Enter Choice: ")
                if choice != "":
                    break  # done with reading repeated input
            if choice in ["1", "2", "3"]:
                break  # we are done with whole method
            print("Invalid choice, please try again.")
            choice = ""  # reset
        return choice

    def sigUsrHandler(self, signalNumber, frame):
        print("Interrupting main thread execution upon SIGUSR1")
        if self.inSigHandler:  # already
            print("Ignoring repeated SIG_USR1...")
            return  # do nothing if it's already not running
        self.inSigHandler = True

        choice = self._doMenu()
        if choice == "1":
            print("Resuming execution...")
            time.sleep(1.0)
        elif choice == "2":
            print("Not implemented yet")
            time.sleep(1.0)
        elif choice == "3":
            ts = ThreadStacks()
            ts.print()
        else:
            raise RuntimeError("Invalid menu choice: {}".format(choice))

        self.inSigHandler = False

S
Shuduo Sang 已提交
2462
    def _printLastNumbers(self):  # to verify data durability
2463 2464
        dbManager = DbManager(resetDb=False)
        dbc = dbManager.getDbConn()
S
Shuduo Sang 已提交
2465
        if dbc.query("show databases") == 0:  # no databae
2466
            return
S
Shuduo Sang 已提交
2467
        if dbc.query("show tables") == 0:  # no tables
S
Steven Li 已提交
2468
            return
2469 2470

        dbc.execute("use db")
S
Shuduo Sang 已提交
2471
        sTbName = dbManager.getFixedSuperTableName()
2472 2473

        # get all regular tables
S
Shuduo Sang 已提交
2474 2475
        # TODO: analyze result set later
        dbc.query("select TBNAME from db.{}".format(sTbName))
2476 2477 2478
        rTables = dbc.getQueryResult()

        bList = TaskExecutor.BoundedList()
S
Shuduo Sang 已提交
2479
        for rTbName in rTables:  # regular tables
2480 2481
            dbc.query("select speed from db.{}".format(rTbName[0]))
            numbers = dbc.getQueryResult()
S
Shuduo Sang 已提交
2482
            for row in numbers:
2483 2484 2485 2486 2487 2488
                # print("<{}>".format(n), end="", flush=True)
                bList.add(row[0])

        print("Top numbers in DB right now: {}".format(bList))
        print("TDengine client execution is about to start in 2 seconds...")
        time.sleep(2.0)
S
Shuduo Sang 已提交
2489
        dbManager = None  # release?
2490 2491 2492 2493

    def prepare(self):
        self._printLastNumbers()

2494
    def run(self, svcMgr):
2495 2496
        self._printLastNumbers()

S
Shuduo Sang 已提交
2497
        dbManager = DbManager()  # Regular function
2498
        thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
2499
        self.tc = ThreadCoordinator(thPool, dbManager)
S
Shuduo Sang 已提交
2500

2501
        self.tc.run()
S
Steven Li 已提交
2502 2503
        # print("exec stats: {}".format(self.tc.getExecStats()))
        # print("TC failed = {}".format(self.tc.isFailed()))
2504
        if svcMgr: # gConfig.auto_start_service:
2505
            svcMgr.stopTaosService()
2506 2507
        # Print exec status, etc., AFTER showing messages from the server
        self.conclude()
S
Steven Li 已提交
2508
        # print("TC failed (2) = {}".format(self.tc.isFailed()))
S
Shuduo Sang 已提交
2509 2510
        # Linux return code: ref https://shapeshed.com/unix-exit-codes/
        return 1 if self.tc.isFailed() else 0
2511 2512 2513

    def conclude(self):
        self.tc.printStats()
S
Shuduo Sang 已提交
2514
        self.tc.getDbManager().cleanUp()
2515 2516 2517


class MainExec:
2518 2519 2520
    STATUS_STARTING = 1
    STATUS_RUNNING = 2
    STATUS_STOPPING = 3
2521
    STATUS_STOPPED = 4
2522

2523 2524 2525
    def __init__(self):        
        self._clientMgr = None
        self._svcMgr = None
2526

2527 2528 2529
        signal.signal(signal.SIGTERM, self.sigIntHandler)
        signal.signal(signal.SIGINT,  self.sigIntHandler)
        signal.signal(signal.SIGUSR1, self.sigUsrHandler)  # different handler!
2530

2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555
    def sigUsrHandler(self, signalNumber, frame):
        if self._clientMgr:
            self._clientMgr.sigUsrHandler(signalNumber, frame)
        elif self._svcMgr: # Only if no client mgr, we are running alone
            self._svcMgr.sigUsrHandler(signalNumber, frame)
        
    def sigIntHandler(self, signalNumber, frame):
        if self._svcMgr:
            self._svcMgr.sigIntHandler(signalNumber, frame)
        if self._clientMgr:
            self._clientMgr.sigIntHandler(signalNumber, frame)

    def runClient(self):
        if gConfig.auto_start_service:
            self._svcMgr = SvcManager()
            self._svcMgr.startTaosService() # we start, don't run
        
        self._clientMgr = ClientManager()
        ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside

    def runService(self):
        self._svcMgr = SvcManager()
        self._svcMgr.run() # run to some end state

    def runTemp(self):  # for debugging purposes
2556 2557
        # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix
        # dbc = dbState.getDbConn()
S
Shuduo Sang 已提交
2558
        # sTbName = dbState.getFixedSuperTableName()
2559 2560
        # dbc.execute("create database if not exists db")
        # if not dbState.getState().equals(StateEmpty()):
S
Shuduo Sang 已提交
2561
        #     dbc.execute("use db")
2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572

        # rTables = None
        # try: # the super table may not exist
        #     sql = "select TBNAME from db.{}".format(sTbName)
        #     logger.info("Finding out tables in super table: {}".format(sql))
        #     dbc.query(sql) # TODO: analyze result set later
        #     logger.info("Fetching result")
        #     rTables = dbc.getQueryResult()
        #     logger.info("Result: {}".format(rTables))
        # except taos.error.ProgrammingError as err:
        #     logger.info("Initial Super table OPS error: {}".format(err))
S
Shuduo Sang 已提交
2573

2574 2575 2576 2577 2578 2579 2580 2581
        # # sys.exit()
        # if ( not rTables == None):
        #     # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
        #     try:
        #         for rTbName in rTables : # regular tables
        #             ds = dbState
        #             logger.info("Inserting into table: {}".format(rTbName[0]))
        #             sql = "insert into db.{} values ('{}', {});".format(
S
Shuduo Sang 已提交
2582
        #                 rTbName[0],
2583 2584
        #                 ds.getNextTick(), ds.getNextInt())
        #             dbc.execute(sql)
S
Shuduo Sang 已提交
2585
        #         for rTbName in rTables : # regular tables
2586
        #             dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure
S
Shuduo Sang 已提交
2587
        #         logger.info("Initial READING operation is successful")
2588
        #     except taos.error.ProgrammingError as err:
S
Shuduo Sang 已提交
2589 2590
        #         logger.info("Initial WRITE/READ error: {}".format(err))

2591 2592 2593
        # Sandbox testing code
        # dbc = dbState.getDbConn()
        # while True:
S
Shuduo Sang 已提交
2594
        #     rows = dbc.query("show databases")
2595
        #     print("Rows: {}, time={}".format(rows, time.time()))
S
Shuduo Sang 已提交
2596 2597
        return

S
Steven Li 已提交
2598

2599
def main():
S
Shuduo Sang 已提交
2600 2601
    # Super cool Python argument library:
    # https://docs.python.org/3/library/argparse.html
2602 2603 2604 2605 2606 2607 2608 2609 2610
    parser = argparse.ArgumentParser(
        formatter_class=argparse.RawDescriptionHelpFormatter,
        description=textwrap.dedent('''\
            TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
            ---------------------------------------------------------------------
            1. You build TDengine in the top level ./build directory, as described in offical docs
            2. You run the server there before this script: ./build/bin/taosd -c test/cfg

            '''))
2611

2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632
    # parser.add_argument('-a', '--auto-start-service', action='store_true',                        
    #                     help='Automatically start/stop the TDengine service (default: false)')
    # parser.add_argument('-c', '--connector-type', action='store', default='native', type=str,
    #                     help='Connector type to use: native, rest, or mixed (default: 10)')
    # parser.add_argument('-d', '--debug', action='store_true',                        
    #                     help='Turn on DEBUG mode for more logging (default: false)')
    # parser.add_argument('-e', '--run-tdengine', action='store_true',                        
    #                     help='Run TDengine service in foreground (default: false)')
    # parser.add_argument('-l', '--larger-data', action='store_true',                        
    #                     help='Write larger amount of data during write operations (default: false)')
    # parser.add_argument('-p', '--per-thread-db-connection', action='store_true',                        
    #                     help='Use a single shared db connection (default: false)')
    # parser.add_argument('-r', '--record-ops', action='store_true',                        
    #                     help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')                    
    # parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int,
    #                     help='Maximum number of steps to run (default: 100)')
    # parser.add_argument('-t', '--num-threads', action='store', default=5, type=int,
    #                     help='Number of threads to run (default: 10)')
    # parser.add_argument('-x', '--continue-on-exception', action='store_true',                        
    #                     help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')                        

S
Shuduo Sang 已提交
2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683
    parser.add_argument(
        '-a',
        '--auto-start-service',
        action='store_true',
        help='Automatically start/stop the TDengine service (default: false)')
    parser.add_argument(
        '-c',
        '--connector-type',
        action='store',
        default='native',
        type=str,
        help='Connector type to use: native, rest, or mixed (default: 10)')
    parser.add_argument(
        '-d',
        '--debug',
        action='store_true',
        help='Turn on DEBUG mode for more logging (default: false)')
    parser.add_argument(
        '-e',
        '--run-tdengine',
        action='store_true',
        help='Run TDengine service in foreground (default: false)')
    parser.add_argument(
        '-l',
        '--larger-data',
        action='store_true',
        help='Write larger amount of data during write operations (default: false)')
    parser.add_argument(
        '-p',
        '--per-thread-db-connection',
        action='store_true',
        help='Use a single shared db connection (default: false)')
    parser.add_argument(
        '-r',
        '--record-ops',
        action='store_true',
        help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')
    parser.add_argument(
        '-s',
        '--max-steps',
        action='store',
        default=1000,
        type=int,
        help='Maximum number of steps to run (default: 100)')
    parser.add_argument(
        '-t',
        '--num-threads',
        action='store',
        default=5,
        type=int,
        help='Number of threads to run (default: 10)')
2684 2685 2686 2687 2688
    parser.add_argument(
        '-x',
        '--continue-on-exception',
        action='store_true',
        help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')
2689

2690
    global gConfig
2691
    gConfig = parser.parse_args()
2692

2693
    # Logging Stuff
2694
    global logger
S
Shuduo Sang 已提交
2695 2696
    _logger = logging.getLogger('CrashGen')  # real logger
    _logger.addFilter(LoggingFilter())
2697 2698 2699
    ch = logging.StreamHandler()
    _logger.addHandler(ch)

S
Shuduo Sang 已提交
2700 2701
    # Logging adapter, to be used as a logger
    logger = MyLoggingAdapter(_logger, [])
2702

S
Shuduo Sang 已提交
2703 2704
    if (gConfig.debug):
        logger.setLevel(logging.DEBUG)  # default seems to be INFO
S
Steven Li 已提交
2705 2706
    else:
        logger.setLevel(logging.INFO)
S
Shuduo Sang 已提交
2707

2708 2709
    Dice.seed(0)  # initial seeding of dice

2710
    # Run server or client
2711
    mExec = MainExec()
S
Shuduo Sang 已提交
2712
    if gConfig.run_tdengine:  # run server
2713
        mExec.runService()
S
Shuduo Sang 已提交
2714
    else:
2715
        return mExec.runClient()
2716

S
Shuduo Sang 已提交
2717

2718
if __name__ == "__main__":
S
Steven Li 已提交
2719 2720 2721
    exitCode = main()
    # print("Exiting with code: {}".format(exitCode))
    sys.exit(exitCode)