crash_gen.py 112.3 KB
Newer Older
S
Shuduo Sang 已提交
1
# -----!/usr/bin/python3.7
S
Steven Li 已提交
2 3 4 5 6 7 8 9 10 11 12 13
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-
S
Shuduo Sang 已提交
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
# For type hinting before definition, ref:
# https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel
from __future__ import annotations
import taos
import crash_gen
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.log import *
from queue import Queue, Empty
from typing import IO
from typing import Set
from typing import Dict
from typing import List
from requests.auth import HTTPBasicAuth
import textwrap
import datetime
import logging
import time
import random
import threading
import requests
import copy
import argparse
import getopt
39

S
Steven Li 已提交
40
import sys
41
import os
42 43
import io
import signal
44
import traceback
45 46 47 48 49 50 51

try:
    import psutil
except:
    print("Psutil module needed, please install: sudo pip3 install psutil")
    sys.exit(-1)

52 53 54 55
# Require Python 3
if sys.version_info[0] < 3:
    raise Exception("Must be using Python 3")

S
Steven Li 已提交
56

S
Shuduo Sang 已提交
57
# Global variables, tried to keep a small number.
58 59 60

# Command-line/Environment Configurations, will set a bit later
# ConfigNameSpace = argparse.Namespace
S
Shuduo Sang 已提交
61
gConfig = argparse.Namespace()  # Dummy value, will be replaced later
62
gSvcMgr = None # TODO: refactor this hack, use dep injection
63
logger = None
S
Steven Li 已提交
64

S
Shuduo Sang 已提交
65
def runThread(wt: WorkerThread):
66
    wt.run()
67

68 69
class CrashGenError(Exception):
    def __init__(self, msg=None, errno=None):
S
Shuduo Sang 已提交
70
        self.msg = msg
71
        self.errno = errno
S
Shuduo Sang 已提交
72

73 74 75
    def __str__(self):
        return self.msg

S
Shuduo Sang 已提交
76

S
Steven Li 已提交
77
class WorkerThread:
78
    def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator,
S
Shuduo Sang 已提交
79 80 81
                 # te: TaskExecutor,
                 ):  # note: main thread context!
        # self._curStep = -1
82
        self._pool = pool
S
Shuduo Sang 已提交
83 84
        self._tid = tid
        self._tc = tc  # type: ThreadCoordinator
S
Steven Li 已提交
85
        # self.threadIdent = threading.get_ident()
86 87
        self._thread = threading.Thread(target=runThread, args=(self,))
        self._stepGate = threading.Event()
S
Steven Li 已提交
88

89
        # Let us have a DB connection of our own
S
Shuduo Sang 已提交
90
        if (gConfig.per_thread_db_connection):  # type: ignore
91
            # print("connector_type = {}".format(gConfig.connector_type))
92 93 94 95 96 97 98 99 100 101 102
            if gConfig.connector_type == 'native':
                self._dbConn = DbConn.createNative() 
            elif gConfig.connector_type == 'rest':
                self._dbConn = DbConn.createRest() 
            elif gConfig.connector_type == 'mixed':
                if Dice.throw(2) == 0: # 1/2 chance
                    self._dbConn = DbConn.createNative() 
                else:
                    self._dbConn = DbConn.createRest() 
            else:
                raise RuntimeError("Unexpected connector type: {}".format(gConfig.connector_type))
103

S
Shuduo Sang 已提交
104
        self._dbInUse = False  # if "use db" was executed already
105

106
    def logDebug(self, msg):
S
Steven Li 已提交
107
        logger.debug("    TRD[{}] {}".format(self._tid, msg))
108 109

    def logInfo(self, msg):
S
Steven Li 已提交
110
        logger.info("    TRD[{}] {}".format(self._tid, msg))
111

112 113 114 115
    def dbInUse(self):
        return self._dbInUse

    def useDb(self):
S
Shuduo Sang 已提交
116
        if (not self._dbInUse):
117 118 119
            self.execSql("use db")
        self._dbInUse = True

120
    def getTaskExecutor(self):
S
Shuduo Sang 已提交
121
        return self._tc.getTaskExecutor()
122

S
Steven Li 已提交
123
    def start(self):
124
        self._thread.start()  # AFTER the thread is recorded
S
Steven Li 已提交
125

S
Shuduo Sang 已提交
126
    def run(self):
S
Steven Li 已提交
127
        # initialization after thread starts, in the thread context
128
        # self.isSleeping = False
129 130
        logger.info("Starting to run thread: {}".format(self._tid))

S
Shuduo Sang 已提交
131
        if (gConfig.per_thread_db_connection):  # type: ignore
132
            logger.debug("Worker thread openning database connection")
133
            self._dbConn.open()
S
Steven Li 已提交
134

S
Shuduo Sang 已提交
135 136
        self._doTaskLoop()

137
        # clean up
S
Shuduo Sang 已提交
138
        if (gConfig.per_thread_db_connection):  # type: ignore
139 140 141 142
            if self._dbConn.isOpen: #sometimes it is not open
                self._dbConn.close()
            else:
                logger.warning("Cleaning up worker thread, dbConn already closed")
143

S
Shuduo Sang 已提交
144
    def _doTaskLoop(self):
145 146
        # while self._curStep < self._pool.maxSteps:
        # tc = ThreadCoordinator(None)
S
Shuduo Sang 已提交
147 148
        while True:
            tc = self._tc  # Thread Coordinator, the overall master
149 150 151
            try:
                tc.crossStepBarrier()  # shared barrier first, INCLUDING the last one
            except threading.BrokenBarrierError as err: # main thread timed out
152
                print("_bto", end="")
153 154 155
                logger.debug("[TRD] Worker thread exiting due to main thread barrier time-out")
                break

156
            logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid))
157
            self.crossStepGate()   # then per-thread gate, after being tapped
158
            logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
159
            if not self._tc.isRunning():
160
                print("_wts", end="")
161
                logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
162 163
                break

164 165
            # Before we fetch the task and run it, let's ensure we properly "use" the database
            try:
166 167 168
                if (gConfig.per_thread_db_connection):  # most likely TRUE
                    if not self._dbConn.isOpen:  # might have been closed during server auto-restart
                        self._dbConn.open()
169 170 171
                self.useDb() # might encounter exceptions. TODO: catch
            except taos.error.ProgrammingError as err:
                errno = Helper.convertErrno(err.errno)
172
                if errno in [0x383, 0x386, 0x00B, 0x014]  : # invalid database, dropping, Unable to establish connection, Database not ready
173 174 175 176 177 178
                    # ignore
                    dummy = 0
                else:
                    print("\nCaught programming error. errno=0x{:X}, msg={} ".format(errno, err.msg))
                    raise

179
            # Fetch a task from the Thread Coordinator
180
            logger.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid))
181
            task = tc.fetchTask()
182 183

            # Execute such a task
S
Shuduo Sang 已提交
184 185 186
            logger.debug(
                "[TRD] Worker thread [{}] about to execute task: {}".format(
                    self._tid, task.__class__.__name__))
187
            task.execute(self)
188
            tc.saveExecutedTask(task)
189
            logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
S
Shuduo Sang 已提交
190 191

            self._dbInUse = False  # there may be changes between steps
192
        # print("_wtd", end=None) # worker thread died
193

S
Shuduo Sang 已提交
194 195
    def verifyThreadSelf(self):  # ensure we are called by this own thread
        if (threading.get_ident() != self._thread.ident):
S
Steven Li 已提交
196 197
            raise RuntimeError("Unexpectly called from other threads")

S
Shuduo Sang 已提交
198 199
    def verifyThreadMain(self):  # ensure we are called by the main thread
        if (threading.get_ident() != threading.main_thread().ident):
S
Steven Li 已提交
200 201 202
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadAlive(self):
S
Shuduo Sang 已提交
203
        if (not self._thread.is_alive()):
S
Steven Li 已提交
204 205
            raise RuntimeError("Unexpected dead thread")

206
    # A gate is different from a barrier in that a thread needs to be "tapped"
S
Steven Li 已提交
207 208
    def crossStepGate(self):
        self.verifyThreadAlive()
S
Shuduo Sang 已提交
209 210
        self.verifyThreadSelf()  # only allowed by ourselves

211
        # Wait again at the "gate", waiting to be "tapped"
S
Shuduo Sang 已提交
212 213 214 215
        logger.debug(
            "[TRD] Worker thread {} about to cross the step gate".format(
                self._tid))
        self._stepGate.wait()
216
        self._stepGate.clear()
S
Shuduo Sang 已提交
217

218
        # self._curStep += 1  # off to a new step...
S
Steven Li 已提交
219

S
Shuduo Sang 已提交
220
    def tapStepGate(self):  # give it a tap, release the thread waiting there
221
        # self.verifyThreadAlive()
S
Shuduo Sang 已提交
222 223
        self.verifyThreadMain()  # only allowed for main thread

224 225 226 227 228 229
        if self._thread.is_alive():
            logger.debug("[TRD] Tapping worker thread {}".format(self._tid))
            self._stepGate.set()  # wake up!
            time.sleep(0)  # let the released thread run a bit
        else:
            print("_tad", end="") # Thread already dead
230

S
Shuduo Sang 已提交
231
    def execSql(self, sql):  # TODO: expose DbConn directly
232
        return self.getDbConn().execute(sql)
233

S
Shuduo Sang 已提交
234
    def querySql(self, sql):  # TODO: expose DbConn directly
235
        return self.getDbConn().query(sql)
236 237

    def getQueryResult(self):
238
        return self.getDbConn().getQueryResult()
239

240
    def getDbConn(self):
S
Shuduo Sang 已提交
241 242
        if (gConfig.per_thread_db_connection):
            return self._dbConn
243
        else:
244
            return self._tc.getDbManager().getDbConn()
245

246 247
    # def querySql(self, sql): # not "execute", since we are out side the DB context
    #     if ( gConfig.per_thread_db_connection ):
S
Shuduo Sang 已提交
248
    #         return self._dbConn.query(sql)
249 250
    #     else:
    #         return self._tc.getDbState().getDbConn().query(sql)
251

252
# The coordinator of all worker threads, mostly running in main thread
S
Shuduo Sang 已提交
253 254


255
class ThreadCoordinator:
S
Steven Li 已提交
256
    WORKER_THREAD_TIMEOUT = 60 # one minute
257

258
    def __init__(self, pool: ThreadPool, dbManager):
S
Shuduo Sang 已提交
259
        self._curStep = -1  # first step is 0
260
        self._pool = pool
261
        # self._wd = wd
S
Shuduo Sang 已提交
262
        self._te = None  # prepare for every new step
263
        self._dbManager = dbManager
S
Shuduo Sang 已提交
264 265
        self._executedTasks: List[Task] = []  # in a given step
        self._lock = threading.RLock()  # sync access for a few things
S
Steven Li 已提交
266

S
Shuduo Sang 已提交
267 268
        self._stepBarrier = threading.Barrier(
            self._pool.numThreads + 1)  # one barrier for all threads
269
        self._execStats = ExecutionStats()
270
        self._runStatus = MainExec.STATUS_RUNNING
S
Steven Li 已提交
271

272 273 274
    def getTaskExecutor(self):
        return self._te

S
Shuduo Sang 已提交
275
    def getDbManager(self) -> DbManager:
276
        return self._dbManager
277

278 279
    def crossStepBarrier(self, timeout=None):
        self._stepBarrier.wait(timeout) 
280

281 282 283 284
    def requestToStop(self):
        self._runStatus = MainExec.STATUS_STOPPING
        self._execStats.registerFailure("User Interruption")

285
    def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout):
286 287 288 289 290 291 292 293 294
        maxSteps = gConfig.max_steps  # type: ignore
        if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9
            return True
        if self._runStatus != MainExec.STATUS_RUNNING:
            return True
        if transitionFailed:
            return True
        if hasAbortedTask:
            return True
295 296
        if workerTimeout:
            return True
297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
        return False

    def _hasAbortedTask(self): # from execution of previous step
        for task in self._executedTasks:
            if task.isAborted():
                # print("Task aborted: {}".format(task))
                # hasAbortedTask = True
                return True
        return False

    def _releaseAllWorkerThreads(self, transitionFailed):
        self._curStep += 1  # we are about to get into next step. TODO: race condition here!
        # Now not all threads had time to go to sleep
        logger.debug(
            "--\r\n\n--> Step {} starts with main thread waking up".format(self._curStep))

        # A new TE for the new step
        self._te = None # set to empty first, to signal worker thread to stop
        if not transitionFailed:  # only if not failed
            self._te = TaskExecutor(self._curStep)

        logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(
                self._curStep))  # Now not all threads had time to go to sleep
        # Worker threads will wake up at this point, and each execute it's own task
        self.tapAllThreads() # release all worker thread from their "gate"

    def _syncAtBarrier(self):
         # Now main thread (that's us) is ready to enter a step
        # let other threads go past the pool barrier, but wait at the
        # thread gate
        logger.debug("[TRD] Main thread about to cross the barrier")
328
        self.crossStepBarrier(timeout=self.WORKER_THREAD_TIMEOUT)
329 330 331 332 333 334 335 336 337 338 339 340 341
        self._stepBarrier.reset()  # Other worker threads should now be at the "gate"
        logger.debug("[TRD] Main thread finished crossing the barrier")

    def _doTransition(self):
        transitionFailed = False
        try:
            sm = self._dbManager.getStateMachine()
            logger.debug("[STT] starting transitions")
            # at end of step, transiton the DB state
            sm.transition(self._executedTasks)
            logger.debug("[STT] transition ended")
            # Due to limitation (or maybe not) of the Python library,
            # we cannot share connections across threads
342 343 344 345 346 347
            # Here we are in main thread, we cannot operate the connections created in workers
            # Moving below to task loop
            # if sm.hasDatabase():
            #     for t in self._pool.threadList:
            #         logger.debug("[DB] use db for all worker threads")
            #         t.useDb()
348 349 350 351 352 353 354 355 356 357 358 359 360
                    # t.execSql("use db") # main thread executing "use
                    # db" on behalf of every worker thread
        except taos.error.ProgrammingError as err:
            if (err.msg == 'network unavailable'):  # broken DB connection
                logger.info("DB connection broken, execution failed")
                traceback.print_stack()
                transitionFailed = True
                self._te = None  # Not running any more
                self._execStats.registerFailure("Broken DB Connection")
                # continue # don't do that, need to tap all threads at
                # end, and maybe signal them to stop
            else:
                raise
361
        return transitionFailed
362 363 364 365 366 367

        self.resetExecutedTasks()  # clear the tasks after we are done
        # Get ready for next step
        logger.debug("<-- Step {} finished, trasition failed = {}".format(self._curStep, transitionFailed))
        return transitionFailed

S
Shuduo Sang 已提交
368
    def run(self):
369
        self._pool.createAndStartThreads(self)
S
Steven Li 已提交
370 371

        # Coordinate all threads step by step
S
Shuduo Sang 已提交
372
        self._curStep = -1  # not started yet
373
        
S
Shuduo Sang 已提交
374
        self._execStats.startExec()  # start the stop watch
375 376
        transitionFailed = False
        hasAbortedTask = False
377 378
        workerTimeout = False
        while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout):
379
            if not gConfig.debug: # print this only if we are not in debug mode                
S
Shuduo Sang 已提交
380
                print(".", end="", flush=True)
381
                        
382 383 384 385 386 387 388 389 390 391
            try:
                self._syncAtBarrier() # For now just cross the barrier
            except threading.BrokenBarrierError as err:
                logger.info("Main loop aborted, caused by worker thread time-out")
                self._execStats.registerFailure("Aborted due to worker thread timeout")
                print("\n\nWorker Thread time-out detected, important thread info:")
                ts = ThreadStacks()
                ts.print(filterInternal=True)
                workerTimeout = True
                break
392 393

            # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
S
Shuduo Sang 已提交
394 395
            # We use this period to do house keeping work, when all worker
            # threads are QUIET.
396 397 398
            hasAbortedTask = self._hasAbortedTask() # from previous step
            if hasAbortedTask: 
                logger.info("Aborted task encountered, exiting test program")
399
                self._execStats.registerFailure("Aborted Task Encountered")
400
                break # do transition only if tasks are error free
S
Shuduo Sang 已提交
401

402
            # Ending previous step
403 404 405 406
            try:
                transitionFailed = self._doTransition() # To start, we end step -1 first
            except taos.error.ProgrammingError as err:
                transitionFailed = True
407
                errno2 = Helper.convertErrno(err.errno)  # correct error scheme
S
Steven Li 已提交
408 409 410
                errMsg = "Transition failed: errno=0x{:X}, msg: {}".format(errno2, err)
                logger.info(errMsg)
                self._execStats.registerFailure(errMsg)
411

412 413
            # Then we move on to the next step
            self._releaseAllWorkerThreads(transitionFailed)                    
414

415 416
        if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate"
            logger.debug("Abnormal ending of main thraed")
417 418
        elif workerTimeout:
            logger.debug("Abnormal ending of main thread, due to worker timeout")
419 420 421
        else: # regular ending, workers waiting at "barrier"
            logger.debug("Regular ending, main thread waiting for all worker threads to stop...")
            self._syncAtBarrier()
422

423 424 425
        self._te = None  # No more executor, time to end
        logger.debug("Main thread tapping all threads one last time...")
        self.tapAllThreads()  # Let the threads run one last time
426

427
        logger.debug("\r\n\n--> Main thread ready to finish up...")
428
        logger.debug("Main thread joining all threads")
S
Shuduo Sang 已提交
429
        self._pool.joinAll()  # Get all threads to finish
430
        logger.info("\nAll worker threads finished")
431 432
        self._execStats.endExec()

433 434
    def printStats(self):
        self._execStats.printStats()
S
Steven Li 已提交
435

S
Steven Li 已提交
436 437 438 439 440 441
    def isFailed(self):
        return self._execStats.isFailed()

    def getExecStats(self):
        return self._execStats

S
Shuduo Sang 已提交
442
    def tapAllThreads(self):  # in a deterministic manner
S
Steven Li 已提交
443
        wakeSeq = []
S
Shuduo Sang 已提交
444 445
        for i in range(self._pool.numThreads):  # generate a random sequence
            if Dice.throw(2) == 1:
S
Steven Li 已提交
446 447 448
                wakeSeq.append(i)
            else:
                wakeSeq.insert(0, i)
S
Shuduo Sang 已提交
449 450 451
        logger.debug(
            "[TRD] Main thread waking up worker threads: {}".format(
                str(wakeSeq)))
452
        # TODO: set dice seed to a deterministic value
S
Steven Li 已提交
453
        for i in wakeSeq:
S
Shuduo Sang 已提交
454 455 456
            # TODO: maybe a bit too deep?!
            self._pool.threadList[i].tapStepGate()
            time.sleep(0)  # yield
S
Steven Li 已提交
457

458
    def isRunning(self):
S
Shuduo Sang 已提交
459
        return self._te is not None
460

S
Shuduo Sang 已提交
461 462
    def fetchTask(self) -> Task:
        if (not self.isRunning()):  # no task
463
            raise RuntimeError("Cannot fetch task when not running")
464 465
        # return self._wd.pickTask()
        # Alternatively, let's ask the DbState for the appropriate task
466 467 468 469 470 471 472
        # dbState = self.getDbState()
        # tasks = dbState.getTasksAtState() # TODO: create every time?
        # nTasks = len(tasks)
        # i = Dice.throw(nTasks)
        # logger.debug(" (dice:{}/{}) ".format(i, nTasks))
        # # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc.
        # return tasks[i].clone() # TODO: still necessary?
S
Shuduo Sang 已提交
473 474 475 476 477
        # pick a task type for current state
        taskType = self.getDbManager().getStateMachine().pickTaskType()
        return taskType(
            self.getDbManager(),
            self._execStats)  # create a task from it
478 479

    def resetExecutedTasks(self):
S
Shuduo Sang 已提交
480
        self._executedTasks = []  # should be under single thread
481 482 483 484

    def saveExecutedTask(self, task):
        with self._lock:
            self._executedTasks.append(task)
485 486

# We define a class to run a number of threads in locking steps.
S
Shuduo Sang 已提交
487

488 489 490 491
class Helper:
    @classmethod
    def convertErrno(cls, errno):
        return errno if (errno > 0) else 0x80000000 + errno
S
Shuduo Sang 已提交
492

493
class ThreadPool:
494
    def __init__(self, numThreads, maxSteps):
495 496 497 498
        self.numThreads = numThreads
        self.maxSteps = maxSteps
        # Internal class variables
        self.curStep = 0
S
Shuduo Sang 已提交
499 500
        self.threadList = []  # type: List[WorkerThread]

501
    # starting to run all the threads, in locking steps
502
    def createAndStartThreads(self, tc: ThreadCoordinator):
S
Shuduo Sang 已提交
503 504
        for tid in range(0, self.numThreads):  # Create the threads
            workerThread = WorkerThread(self, tid, tc)
505
            self.threadList.append(workerThread)
S
Shuduo Sang 已提交
506
            workerThread.start()  # start, but should block immediately before step 0
507 508 509 510 511 512

    def joinAll(self):
        for workerThread in self.threadList:
            logger.debug("Joining thread...")
            workerThread._thread.join()

513 514
# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers
# for new table names
S
Shuduo Sang 已提交
515 516


S
Steven Li 已提交
517 518
class LinearQueue():
    def __init__(self):
519
        self.firstIndex = 1  # 1st ever element
S
Steven Li 已提交
520
        self.lastIndex = 0
S
Shuduo Sang 已提交
521 522
        self._lock = threading.RLock()  # our functions may call each other
        self.inUse = set()  # the indexes that are in use right now
S
Steven Li 已提交
523

524
    def toText(self):
S
Shuduo Sang 已提交
525 526
        return "[{}..{}], in use: {}".format(
            self.firstIndex, self.lastIndex, self.inUse)
527 528

    # Push (add new element, largest) to the tail, and mark it in use
S
Shuduo Sang 已提交
529
    def push(self):
530
        with self._lock:
S
Shuduo Sang 已提交
531 532
            # if ( self.isEmpty() ):
            #     self.lastIndex = self.firstIndex
533
            #     return self.firstIndex
534 535
            # Otherwise we have something
            self.lastIndex += 1
536 537
            self.allocate(self.lastIndex)
            # self.inUse.add(self.lastIndex) # mark it in use immediately
538
            return self.lastIndex
S
Steven Li 已提交
539 540

    def pop(self):
541
        with self._lock:
S
Shuduo Sang 已提交
542 543 544 545
            if (self.isEmpty()):
                # raise RuntimeError("Cannot pop an empty queue")
                return False  # TODO: None?

546
            index = self.firstIndex
S
Shuduo Sang 已提交
547
            if (index in self.inUse):
548 549
                return False

550 551 552 553 554 555 556
            self.firstIndex += 1
            return index

    def isEmpty(self):
        return self.firstIndex > self.lastIndex

    def popIfNotEmpty(self):
557
        with self._lock:
558 559 560 561
            if (self.isEmpty()):
                return 0
            return self.pop()

S
Steven Li 已提交
562
    def allocate(self, i):
563
        with self._lock:
564
            # logger.debug("LQ allocating item {}".format(i))
S
Shuduo Sang 已提交
565 566 567
            if (i in self.inUse):
                raise RuntimeError(
                    "Cannot re-use same index in queue: {}".format(i))
568 569
            self.inUse.add(i)

S
Steven Li 已提交
570
    def release(self, i):
571
        with self._lock:
572
            # logger.debug("LQ releasing item {}".format(i))
S
Shuduo Sang 已提交
573
            self.inUse.remove(i)  # KeyError possible, TODO: why?
574 575 576 577

    def size(self):
        return self.lastIndex + 1 - self.firstIndex

S
Steven Li 已提交
578
    def pickAndAllocate(self):
S
Shuduo Sang 已提交
579
        if (self.isEmpty()):
580 581
            return None
        with self._lock:
S
Shuduo Sang 已提交
582
            cnt = 0  # counting the interations
583 584
            while True:
                cnt += 1
S
Shuduo Sang 已提交
585
                if (cnt > self.size() * 10):  # 10x iteration already
586 587
                    # raise RuntimeError("Failed to allocate LinearQueue element")
                    return None
S
Shuduo Sang 已提交
588 589
                ret = Dice.throwRange(self.firstIndex, self.lastIndex + 1)
                if (ret not in self.inUse):
590 591 592
                    self.allocate(ret)
                    return ret

S
Shuduo Sang 已提交
593

594
class DbConn:
595
    TYPE_NATIVE = "native-c"
596
    TYPE_REST =   "rest-api"
597 598 599 600 601 602 603 604 605
    TYPE_INVALID = "invalid"

    @classmethod
    def create(cls, connType):
        if connType == cls.TYPE_NATIVE:
            return DbConnNative()
        elif connType == cls.TYPE_REST:
            return DbConnRest()
        else:
S
Shuduo Sang 已提交
606 607
            raise RuntimeError(
                "Unexpected connection type: {}".format(connType))
608 609 610 611 612 613 614 615 616

    @classmethod
    def createNative(cls):
        return cls.create(cls.TYPE_NATIVE)

    @classmethod
    def createRest(cls):
        return cls.create(cls.TYPE_REST)

617 618
    def __init__(self):
        self.isOpen = False
619
        self._type = self.TYPE_INVALID
620 621 622 623
        self._lastSql = None

    def getLastSql(self):
        return self._lastSql
624 625

    def open(self):
S
Shuduo Sang 已提交
626
        if (self.isOpen):
627 628
            raise RuntimeError("Cannot re-open an existing DB connection")

629 630
        # below implemented by child classes
        self.openByType()
631

632
        logger.debug("[DB] data connection opened, type = {}".format(self._type))
633 634
        self.isOpen = True

S
Shuduo Sang 已提交
635 636
    def resetDb(self):  # reset the whole database, etc.
        if (not self.isOpen):
637
            raise RuntimeError("Cannot reset database until connection is open")
638 639
        # self._tdSql.prepare() # Recreate database, etc.

640
        self.execute('drop database if exists db')
641 642
        logger.debug("Resetting DB, dropped database")
        # self._cursor.execute('create database db')
643
        # self._cursor.execute('use db')
644 645
        # tdSql.execute('show databases')

S
Shuduo Sang 已提交
646
    def queryScalar(self, sql) -> int:
647 648
        return self._queryAny(sql)

S
Shuduo Sang 已提交
649
    def queryString(self, sql) -> str:
650 651
        return self._queryAny(sql)

S
Shuduo Sang 已提交
652 653
    def _queryAny(self, sql):  # actual query result as an int
        if (not self.isOpen):
654
            raise RuntimeError("Cannot query database until connection is open")
655
        nRows = self.query(sql)
S
Shuduo Sang 已提交
656
        if nRows != 1:
657
            raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows))
658
        if self.getResultRows() != 1 or self.getResultCols() != 1:
659
            raise RuntimeError("Unexpected result set for query: {}".format(sql))
660 661
        return self.getQueryResult()[0][0]

662 663 664 665
    def use(self, dbName):
        self.execute("use {}".format(dbName))

    def hasDatabases(self):
666
        return self.query("show databases") > 1 # We now have a "log" database by default
667 668 669 670

    def hasTables(self):
        return self.query("show tables") > 0

671 672
    def execute(self, sql):
        raise RuntimeError("Unexpected execution, should be overriden")
S
Shuduo Sang 已提交
673

674 675 676
    def query(self, sql) -> int: # return num rows returned
        raise RuntimeError("Unexpected execution, should be overriden")

677 678
    def openByType(self):
        raise RuntimeError("Unexpected execution, should be overriden")
S
Shuduo Sang 已提交
679

680 681
    def getQueryResult(self):
        raise RuntimeError("Unexpected execution, should be overriden")
S
Shuduo Sang 已提交
682

683 684
    def getResultRows(self):
        raise RuntimeError("Unexpected execution, should be overriden")
S
Shuduo Sang 已提交
685

686 687 688 689
    def getResultCols(self):
        raise RuntimeError("Unexpected execution, should be overriden")

# Sample: curl -u root:taosdata -d "show databases" localhost:6020/rest/sql
S
Shuduo Sang 已提交
690 691


692 693 694 695
class DbConnRest(DbConn):
    def __init__(self):
        super().__init__()
        self._type = self.TYPE_REST
S
Steven Li 已提交
696
        self._url = "http://localhost:6041/rest/sql"  # fixed for now
697 698
        self._result = None

S
Shuduo Sang 已提交
699 700 701
    def openByType(self):  # Open connection
        pass  # do nothing, always open

702
    def close(self):
S
Shuduo Sang 已提交
703
        if (not self.isOpen):
704
            raise RuntimeError("Cannot clean up database until connection is open")
705 706 707 708 709
        # Do nothing for REST
        logger.debug("[DB] REST Database connection closed")
        self.isOpen = False

    def _doSql(self, sql):
710
        self._lastSql = sql # remember this, last SQL attempted
711 712 713
        try:
            r = requests.post(self._url, 
                data = sql,
714
                auth = HTTPBasicAuth('root', 'taosdata'))         
715 716 717
        except:
            print("REST API Failure (TODO: more info here)")
            raise
718 719
        rj = r.json()
        # Sanity check for the "Json Result"
S
Shuduo Sang 已提交
720
        if ('status' not in rj):
721 722
            raise RuntimeError("No status in REST response")

S
Shuduo Sang 已提交
723 724 725 726
        if rj['status'] == 'error':  # clearly reported error
            if ('code' not in rj):  # error without code
                raise RuntimeError("REST error return without code")
            errno = rj['code']  # May need to massage this in the future
727
            # print("Raising programming error with REST return: {}".format(rj))
S
Shuduo Sang 已提交
728 729
            raise taos.error.ProgrammingError(
                rj['desc'], errno)  # todo: check existance of 'desc'
730

S
Shuduo Sang 已提交
731 732 733 734
        if rj['status'] != 'succ':  # better be this
            raise RuntimeError(
                "Unexpected REST return status: {}".format(
                    rj['status']))
735 736

        nRows = rj['rows'] if ('rows' in rj) else 0
S
Shuduo Sang 已提交
737
        self._result = rj
738 739
        return nRows

S
Shuduo Sang 已提交
740 741 742 743
    def execute(self, sql):
        if (not self.isOpen):
            raise RuntimeError(
                "Cannot execute database commands until connection is open")
744 745
        logger.debug("[SQL-REST] Executing SQL: {}".format(sql))
        nRows = self._doSql(sql)
S
Shuduo Sang 已提交
746 747
        logger.debug(
            "[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
748 749
        return nRows

S
Shuduo Sang 已提交
750
    def query(self, sql):  # return rows affected
751 752 753 754 755 756 757 758 759 760 761 762 763
        return self.execute(sql)

    def getQueryResult(self):
        return self._result['data']

    def getResultRows(self):
        print(self._result)
        raise RuntimeError("TBD")
        # return self._tdSql.queryRows

    def getResultCols(self):
        print(self._result)
        raise RuntimeError("TBD")
S
Shuduo Sang 已提交
764

765
    # Duplicate code from TDMySQL, TODO: merge all this into DbConnNative
766 767


768
class MyTDSql:
769 770 771 772 773
    def __init__(self, hostAddr, cfgPath):
        # Make the DB connection
        self._conn = taos.connect(host=hostAddr, config=cfgPath) 
        self._cursor = self._conn.cursor()

774 775 776 777
        self.queryRows = 0
        self.queryCols = 0
        self.affectedRows = 0

778 779
    # def init(self, cursor, log=True):
    #     self.cursor = cursor
780 781 782 783 784
        # if (log):
        #     caller = inspect.getframeinfo(inspect.stack()[1][0])
        #     self.cursor.log(caller.filename + ".sql")

    def close(self):
785 786
        self._conn.close() # TODO: very important, cursor close does NOT close DB connection!
        self._cursor.close()
787 788 789 790

    def query(self, sql):
        self.sql = sql
        try:
791 792
            self._cursor.execute(sql)
            self.queryResult = self._cursor.fetchall()
793
            self.queryRows = len(self.queryResult)
794
            self.queryCols = len(self._cursor.description)
795 796 797 798 799 800
        except Exception as e:
            # caller = inspect.getframeinfo(inspect.stack()[1][0])
            # args = (caller.filename, caller.lineno, sql, repr(e))
            # tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
            raise
        return self.queryRows
801

802 803 804
    def execute(self, sql):
        self.sql = sql
        try:
805
            self.affectedRows = self._cursor.execute(sql)
806 807 808 809 810 811 812
        except Exception as e:
            # caller = inspect.getframeinfo(inspect.stack()[1][0])
            # args = (caller.filename, caller.lineno, sql, repr(e))
            # tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
            raise
        return self.affectedRows

S
Shuduo Sang 已提交
813

814
class DbConnNative(DbConn):
815 816 817
    # Class variables
    _lock = threading.Lock()
    _connInfoDisplayed = False
818
    totalConnections = 0 # Not private
819

820 821
    def __init__(self):
        super().__init__()
822
        self._type = self.TYPE_NATIVE
S
Shuduo Sang 已提交
823
        self._conn = None
824
        # self._cursor = None        
S
Shuduo Sang 已提交
825

826 827 828 829 830 831 832
    def getBuildPath(self):
        selfPath = os.path.dirname(os.path.realpath(__file__))
        if ("community" in selfPath):
            projPath = selfPath[:selfPath.find("communit")]
        else:
            projPath = selfPath[:selfPath.find("tests")]

833
        buildPath = None
834 835 836 837
        for root, dirs, files in os.walk(projPath):
            if ("taosd" in files):
                rootRealPath = os.path.dirname(os.path.realpath(root))
                if ("packaging" not in rootRealPath):
S
Shuduo Sang 已提交
838
                    buildPath = root[:len(root) - len("/build/bin")]
839
                    break
840
        if buildPath == None:
841 842
            raise RuntimeError("Failed to determine buildPath, selfPath={}, projPath={}"
                .format(selfPath, projPath))
843 844
        return buildPath

845
    
S
Shuduo Sang 已提交
846
    def openByType(self):  # Open connection
847
        cfgPath = self.getBuildPath() + "/test/cfg"
848
        hostAddr = "127.0.0.1"
849

850 851 852 853 854 855 856 857 858 859 860
        cls = self.__class__ # Get the class, to access class variables
        with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!!
            if not cls._connInfoDisplayed:
                cls._connInfoDisplayed = True # updating CLASS variable
                logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath))                    
            # Make the connection         
            # self._conn = taos.connect(host=hostAddr, config=cfgPath)  # TODO: make configurable
            # self._cursor = self._conn.cursor()
            # Record the count in the class
            self._tdSql = MyTDSql(hostAddr, cfgPath) # making DB connection
            cls.totalConnections += 1 
861
        
862
        self._tdSql.execute('reset query cache')
S
Shuduo Sang 已提交
863
        # self._cursor.execute('use db') # do this at the beginning of every
864 865

        # Open connection
866 867 868
        # self._tdSql = MyTDSql()
        # self._tdSql.init(self._cursor)
        
869
    def close(self):
S
Shuduo Sang 已提交
870
        if (not self.isOpen):
871
            raise RuntimeError("Cannot clean up database until connection is open")
872
        self._tdSql.close()
873 874 875 876 877
        # Decrement the class wide counter
        cls = self.__class__ # Get the class, to access class variables
        with cls._lock:
            cls.totalConnections -= 1

878
        logger.debug("[DB] Database connection closed")
879
        self.isOpen = False
S
Steven Li 已提交
880

S
Shuduo Sang 已提交
881 882
    def execute(self, sql):
        if (not self.isOpen):
883
            raise RuntimeError("Cannot execute database commands until connection is open")
884
        logger.debug("[SQL] Executing SQL: {}".format(sql))
885
        self._lastSql = sql
886
        nRows = self._tdSql.execute(sql)
S
Shuduo Sang 已提交
887 888 889
        logger.debug(
            "[SQL] Execution Result, nRows = {}, SQL = {}".format(
                nRows, sql))
890
        return nRows
S
Steven Li 已提交
891

S
Shuduo Sang 已提交
892 893 894 895
    def query(self, sql):  # return rows affected
        if (not self.isOpen):
            raise RuntimeError(
                "Cannot query database until connection is open")
896
        logger.debug("[SQL] Executing SQL: {}".format(sql))
897
        self._lastSql = sql
898
        nRows = self._tdSql.query(sql)
S
Shuduo Sang 已提交
899 900 901
        logger.debug(
            "[SQL] Query Result, nRows = {}, SQL = {}".format(
                nRows, sql))
902
        return nRows
903
        # results are in: return self._tdSql.queryResult
904

905 906 907
    def getQueryResult(self):
        return self._tdSql.queryResult

908 909
    def getResultRows(self):
        return self._tdSql.queryRows
910

911 912
    def getResultCols(self):
        return self._tdSql.queryCols
913

S
Shuduo Sang 已提交
914

915
class AnyState:
S
Shuduo Sang 已提交
916 917 918
    STATE_INVALID = -1
    STATE_EMPTY = 0  # nothing there, no even a DB
    STATE_DB_ONLY = 1  # we have a DB, but nothing else
919
    STATE_TABLE_ONLY = 2  # we have a table, but totally empty
S
Shuduo Sang 已提交
920
    STATE_HAS_DATA = 3  # we have some data in the table
921 922 923 924 925
    _stateNames = ["Invalid", "Empty", "DB_Only", "Table_Only", "Has_Data"]

    STATE_VAL_IDX = 0
    CAN_CREATE_DB = 1
    CAN_DROP_DB = 2
926 927
    CAN_CREATE_FIXED_SUPER_TABLE = 3
    CAN_DROP_FIXED_SUPER_TABLE = 4
928 929 930 931 932 933 934
    CAN_ADD_DATA = 5
    CAN_READ_DATA = 6

    def __init__(self):
        self._info = self.getInfo()

    def __str__(self):
S
Shuduo Sang 已提交
935 936
        # -1 hack to accomodate the STATE_INVALID case
        return self._stateNames[self._info[self.STATE_VAL_IDX] + 1]
937 938 939 940

    def getInfo(self):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
941 942 943 944 945 946
    def equals(self, other):
        if isinstance(other, int):
            return self.getValIndex() == other
        elif isinstance(other, AnyState):
            return self.getValIndex() == other.getValIndex()
        else:
S
Shuduo Sang 已提交
947 948 949
            raise RuntimeError(
                "Unexpected comparison, type = {}".format(
                    type(other)))
S
Steven Li 已提交
950

951 952 953
    def verifyTasksToState(self, tasks, newState):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
954 955 956
    def getValIndex(self):
        return self._info[self.STATE_VAL_IDX]

957 958
    def getValue(self):
        return self._info[self.STATE_VAL_IDX]
S
Shuduo Sang 已提交
959

960 961
    def canCreateDb(self):
        return self._info[self.CAN_CREATE_DB]
S
Shuduo Sang 已提交
962

963 964
    def canDropDb(self):
        return self._info[self.CAN_DROP_DB]
S
Shuduo Sang 已提交
965

966 967
    def canCreateFixedSuperTable(self):
        return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE]
S
Shuduo Sang 已提交
968

969 970
    def canDropFixedSuperTable(self):
        return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
S
Shuduo Sang 已提交
971

972 973
    def canAddData(self):
        return self._info[self.CAN_ADD_DATA]
S
Shuduo Sang 已提交
974

975 976 977 978 979
    def canReadData(self):
        return self._info[self.CAN_READ_DATA]

    def assertAtMostOneSuccess(self, tasks, cls):
        sCnt = 0
S
Shuduo Sang 已提交
980
        for task in tasks:
981 982 983
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
S
Steven Li 已提交
984
                # task.logDebug("Task success found")
985
                sCnt += 1
S
Shuduo Sang 已提交
986 987 988
                if (sCnt >= 2):
                    raise RuntimeError(
                        "Unexpected more than 1 success with task: {}".format(cls))
989 990 991 992

    def assertIfExistThenSuccess(self, tasks, cls):
        sCnt = 0
        exists = False
S
Shuduo Sang 已提交
993
        for task in tasks:
994 995
            if not isinstance(task, cls):
                continue
S
Shuduo Sang 已提交
996
            exists = True  # we have a valid instance
997 998
            if task.isSuccess():
                sCnt += 1
S
Shuduo Sang 已提交
999 1000 1001
        if (exists and sCnt <= 0):
            raise RuntimeError(
                "Unexpected zero success for task: {}".format(cls))
1002 1003

    def assertNoTask(self, tasks, cls):
S
Shuduo Sang 已提交
1004
        for task in tasks:
1005
            if isinstance(task, cls):
S
Shuduo Sang 已提交
1006 1007
                raise CrashGenError(
                    "This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__))
1008 1009

    def assertNoSuccess(self, tasks, cls):
S
Shuduo Sang 已提交
1010
        for task in tasks:
1011 1012
            if isinstance(task, cls):
                if task.isSuccess():
S
Shuduo Sang 已提交
1013 1014
                    raise RuntimeError(
                        "Unexpected successful task: {}".format(cls))
1015 1016

    def hasSuccess(self, tasks, cls):
S
Shuduo Sang 已提交
1017
        for task in tasks:
1018 1019 1020 1021 1022 1023
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
                return True
        return False

S
Steven Li 已提交
1024
    def hasTask(self, tasks, cls):
S
Shuduo Sang 已提交
1025
        for task in tasks:
S
Steven Li 已提交
1026 1027 1028 1029
            if isinstance(task, cls):
                return True
        return False

S
Shuduo Sang 已提交
1030

1031 1032 1033 1034
class StateInvalid(AnyState):
    def getInfo(self):
        return [
            self.STATE_INVALID,
S
Shuduo Sang 已提交
1035 1036 1037
            False, False,  # can create/drop Db
            False, False,  # can create/drop fixed table
            False, False,  # can insert/read data with fixed table
1038 1039 1040 1041
        ]

    # def verifyTasksToState(self, tasks, newState):

S
Shuduo Sang 已提交
1042

1043 1044 1045 1046
class StateEmpty(AnyState):
    def getInfo(self):
        return [
            self.STATE_EMPTY,
S
Shuduo Sang 已提交
1047 1048 1049
            True, False,  # can create/drop Db
            False, False,  # can create/drop fixed table
            False, False,  # can insert/read data with fixed table
1050 1051
        ]

S
Shuduo Sang 已提交
1052 1053
    def verifyTasksToState(self, tasks, newState):
        if (self.hasSuccess(tasks, TaskCreateDb)
1054
                ):  # at EMPTY, if there's succes in creating DB
S
Shuduo Sang 已提交
1055 1056 1057 1058
            if (not self.hasTask(tasks, TaskDropDb)):  # and no drop_db tasks
                # we must have at most one. TODO: compare numbers
                self.assertAtMostOneSuccess(tasks, TaskCreateDb)

1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069

class StateDbOnly(AnyState):
    def getInfo(self):
        return [
            self.STATE_DB_ONLY,
            False, True,
            True, False,
            False, False,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
1070 1071 1072
        if (not self.hasTask(tasks, TaskCreateDb)):
            # only if we don't create any more
            self.assertAtMostOneSuccess(tasks, TaskDropDb)
1073 1074 1075 1076 1077

        # TODO: restore the below, the problem exists, although unlikely in real-world
        # if (gSvcMgr!=None) and gSvcMgr.isRestarting():     
        # if (gSvcMgr == None) or (not gSvcMgr.isRestarting()) : 
        #     self.assertIfExistThenSuccess(tasks, TaskDropDb)       
1078

S
Shuduo Sang 已提交
1079

1080
class StateSuperTableOnly(AnyState):
1081 1082 1083 1084 1085 1086 1087 1088 1089
    def getInfo(self):
        return [
            self.STATE_TABLE_ONLY,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
1090
        if (self.hasSuccess(tasks, TaskDropSuperTable)
1091
                ):  # we are able to drop the table
1092
            #self.assertAtMostOneSuccess(tasks, TaskDropSuperTable)
S
Shuduo Sang 已提交
1093 1094
            # we must have had recreted it
            self.hasSuccess(tasks, TaskCreateSuperTable)
1095

1096
            # self._state = self.STATE_DB_ONLY
S
Steven Li 已提交
1097 1098
        # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
        #     self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases
1099
            # self._state = self.STATE_HAS_DATA
S
Steven Li 已提交
1100 1101 1102
        # elif ( self.hasSuccess(tasks, ReadFixedDataTask) ): # no success in prev cases, but was able to read data
            # self.assertNoTask(tasks, DropFixedTableTask)
            # self.assertNoTask(tasks, AddFixedDataTask)
1103
            # self._state = self.STATE_TABLE_ONLY # no change
S
Steven Li 已提交
1104 1105 1106
        # else: # did not drop table, did not insert data, did not read successfully, that is impossible
        #     raise RuntimeError("Unexpected no-success scenarios")
        # TODO: need to revamp!!
1107

S
Shuduo Sang 已提交
1108

1109 1110 1111 1112 1113 1114 1115 1116 1117 1118
class StateHasData(AnyState):
    def getInfo(self):
        return [
            self.STATE_HAS_DATA,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
1119
        if (newState.equals(AnyState.STATE_EMPTY)):
1120
            self.hasSuccess(tasks, TaskDropDb)
S
Shuduo Sang 已提交
1121 1122 1123 1124
            if (not self.hasTask(tasks, TaskCreateDb)):
                self.assertAtMostOneSuccess(tasks, TaskDropDb)  # TODO: dicy
        elif (newState.equals(AnyState.STATE_DB_ONLY)):  # in DB only
            if (not self.hasTask(tasks, TaskCreateDb)
1125
                ):  # without a create_db task
S
Shuduo Sang 已提交
1126 1127
                # we must have drop_db task
                self.assertNoTask(tasks, TaskDropDb)
1128
            self.hasSuccess(tasks, TaskDropSuperTable)
1129
            # self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy
1130 1131 1132 1133
        # elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted
            # self.assertNoTask(tasks, TaskDropDb)
            # self.assertNoTask(tasks, TaskDropSuperTable)
            # self.assertNoTask(tasks, TaskAddData)
S
Steven Li 已提交
1134
            # self.hasSuccess(tasks, DeleteDataTasks)
S
Shuduo Sang 已提交
1135 1136
        else:  # should be STATE_HAS_DATA
            if (not self.hasTask(tasks, TaskCreateDb)
1137
                ):  # only if we didn't create one
S
Shuduo Sang 已提交
1138 1139 1140
                # we shouldn't have dropped it
                self.assertNoTask(tasks, TaskDropDb)
            if (not self.hasTask(tasks, TaskCreateSuperTable)
1141
                    ):  # if we didn't create the table
S
Shuduo Sang 已提交
1142 1143
                # we should not have a task that drops it
                self.assertNoTask(tasks, TaskDropSuperTable)
1144
            # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask)
S
Steven Li 已提交
1145

S
Shuduo Sang 已提交
1146

1147
class StateMechine:
1148 1149
    def __init__(self, dbConn):
        self._dbConn = dbConn
S
Shuduo Sang 已提交
1150 1151 1152
        self._curState = self._findCurrentState()  # starting state
        # transitition target probabilities, indexed with value of STATE_EMPTY,
        # STATE_DB_ONLY, etc.
1153
        self._stateWeights = [1, 2, 10, 40]
S
Shuduo Sang 已提交
1154

1155 1156 1157
    def getCurrentState(self):
        return self._curState

1158 1159 1160
    def hasDatabase(self):
        return self._curState.canDropDb()  # ha, can drop DB means it has one

1161
    # May be slow, use cautionsly...
S
Shuduo Sang 已提交
1162
    def getTaskTypes(self):  # those that can run (directly/indirectly) from the current state
1163 1164 1165 1166 1167 1168
        def typesToStrings(types):
            ss = []
            for t in types:
                ss.append(t.__name__)
            return ss

S
Shuduo Sang 已提交
1169
        allTaskClasses = StateTransitionTask.__subclasses__()  # all state transition tasks
1170 1171
        firstTaskTypes = []
        for tc in allTaskClasses:
S
Shuduo Sang 已提交
1172
            # t = tc(self) # create task object
1173 1174
            if tc.canBeginFrom(self._curState):
                firstTaskTypes.append(tc)
S
Shuduo Sang 已提交
1175 1176 1177 1178 1179 1180 1181 1182
        # now we have all the tasks that can begin directly from the current
        # state, let's figure out the INDIRECT ones
        taskTypes = firstTaskTypes.copy()  # have to have these
        for task1 in firstTaskTypes:  # each task type gathered so far
            endState = task1.getEndState()  # figure the end state
            if endState is None:  # does not change end state
                continue  # no use, do nothing
            for tc in allTaskClasses:  # what task can further begin from there?
1183
                if tc.canBeginFrom(endState) and (tc not in firstTaskTypes):
S
Shuduo Sang 已提交
1184
                    taskTypes.append(tc)  # gather it
1185 1186

        if len(taskTypes) <= 0:
S
Shuduo Sang 已提交
1187 1188 1189 1190 1191 1192 1193
            raise RuntimeError(
                "No suitable task types found for state: {}".format(
                    self._curState))
        logger.debug(
            "[OPS] Tasks found for state {}: {}".format(
                self._curState,
                typesToStrings(taskTypes)))
1194 1195 1196 1197
        return taskTypes

    def _findCurrentState(self):
        dbc = self._dbConn
S
Shuduo Sang 已提交
1198
        ts = time.time()  # we use this to debug how fast/slow it is to do the various queries to find the current DB state
1199 1200
        if not dbc.hasDatabases():  # no database?!
            logger.debug( "[STT] empty database found, between {} and {}".format(ts, time.time()))
1201
            return StateEmpty()
S
Shuduo Sang 已提交
1202 1203
        # did not do this when openning connection, and this is NOT the worker
        # thread, which does this on their own
1204 1205 1206
        dbc.use("db")
        if not dbc.hasTables():  # no tables
            logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
1207
            return StateDbOnly()
1208 1209 1210 1211

        sTable = DbManager.getFixedSuperTable()
        if sTable.hasRegTables(dbc):  # no regular tables
            logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
1212
            return StateSuperTableOnly()
S
Shuduo Sang 已提交
1213
        else:  # has actual tables
1214
            logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
1215 1216 1217
            return StateHasData()

    def transition(self, tasks):
S
Shuduo Sang 已提交
1218
        if (len(tasks) == 0):  # before 1st step, or otherwise empty
1219
            logger.debug("[STT] Starting State: {}".format(self._curState))
S
Shuduo Sang 已提交
1220
            return  # do nothing
1221

S
Shuduo Sang 已提交
1222 1223
        # this should show up in the server log, separating steps
        self._dbConn.execute("show dnodes")
1224 1225 1226 1227

        # Generic Checks, first based on the start state
        if self._curState.canCreateDb():
            self._curState.assertIfExistThenSuccess(tasks, TaskCreateDb)
S
Shuduo Sang 已提交
1228 1229
            # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in
            # case of multiple creation and drops
1230 1231

        if self._curState.canDropDb():
1232
            if gSvcMgr == None: # only if we are running as client-only
S
Steven Li 已提交
1233
                self._curState.assertIfExistThenSuccess(tasks, TaskDropDb)
S
Shuduo Sang 已提交
1234 1235
            # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in
            # case of drop-create-drop
1236 1237 1238

        # if self._state.canCreateFixedTable():
            # self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped
S
Shuduo Sang 已提交
1239 1240
            # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not
            # really, in case of create-drop-create
1241 1242 1243

        # if self._state.canDropFixedTable():
            # self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped
S
Shuduo Sang 已提交
1244 1245
            # self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not
            # really in case of drop-create-drop
1246 1247

        # if self._state.canAddData():
S
Shuduo Sang 已提交
1248 1249
        # self.assertIfExistThenSuccess(tasks, AddFixedDataTask)  # not true
        # actually
1250 1251 1252 1253 1254 1255

        # if self._state.canReadData():
            # Nothing for sure

        newState = self._findCurrentState()
        logger.debug("[STT] New DB state determined: {}".format(newState))
S
Shuduo Sang 已提交
1256 1257
        # can old state move to new state through the tasks?
        self._curState.verifyTasksToState(tasks, newState)
1258 1259 1260
        self._curState = newState

    def pickTaskType(self):
S
Shuduo Sang 已提交
1261 1262
        # all the task types we can choose from at curent state
        taskTypes = self.getTaskTypes()
1263 1264 1265
        weights = []
        for tt in taskTypes:
            endState = tt.getEndState()
S
Shuduo Sang 已提交
1266 1267 1268
            if endState is not None:
                # TODO: change to a method
                weights.append(self._stateWeights[endState.getValIndex()])
1269
            else:
S
Shuduo Sang 已提交
1270 1271
                # read data task, default to 10: TODO: change to a constant
                weights.append(10)
1272
        i = self._weighted_choice_sub(weights)
S
Shuduo Sang 已提交
1273
        # logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))
1274 1275
        return taskTypes[i]

S
Shuduo Sang 已提交
1276 1277 1278 1279 1280
    # ref:
    # https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
    def _weighted_choice_sub(self, weights):
        # TODO: use our dice to ensure it being determinstic?
        rnd = random.random() * sum(weights)
1281 1282 1283 1284
        for i, w in enumerate(weights):
            rnd -= w
            if rnd < 0:
                return i
1285

1286
# Manager of the Database Data/Connection
S
Shuduo Sang 已提交
1287 1288 1289 1290


class DbManager():
    def __init__(self, resetDb=True):
S
Steven Li 已提交
1291
        self.tableNumQueue = LinearQueue()
S
Shuduo Sang 已提交
1292 1293 1294
        # datetime.datetime(2019, 1, 1) # initial date time tick
        self._lastTick = self.setupLastTick()
        self._lastInt = 0  # next one is initial integer
1295
        self._lock = threading.RLock()
S
Shuduo Sang 已提交
1296

1297
        # self.openDbServerConnection()
S
Shuduo Sang 已提交
1298 1299
        self._dbConn = DbConn.createNative() if (
            gConfig.connector_type == 'native') else DbConn.createRest()
1300
        try:
S
Shuduo Sang 已提交
1301
            self._dbConn.open()  # may throw taos.error.ProgrammingError: disconnected
1302 1303
        except taos.error.ProgrammingError as err:
            # print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
S
Shuduo Sang 已提交
1304 1305 1306
            if (err.msg == 'client disconnected'):  # cannot open DB connection
                print(
                    "Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
S
Steven Li 已提交
1307
                sys.exit(2)
1308
            else:
1309
                print("Failed to connect to DB, errno = {}, msg: {}".format(Helper.convertErrno(err.errno), err.msg))
S
Shuduo Sang 已提交
1310 1311
                raise
        except BaseException:
S
Steven Li 已提交
1312
            print("[=] Unexpected exception")
S
Shuduo Sang 已提交
1313 1314 1315 1316
            raise

        if resetDb:
            self._dbConn.resetDb()  # drop and recreate DB
1317

S
Shuduo Sang 已提交
1318 1319
        # Do this after dbConn is in proper shape
        self._stateMachine = StateMechine(self._dbConn)
1320

1321 1322 1323
    def getDbConn(self):
        return self._dbConn

S
Shuduo Sang 已提交
1324
    def getStateMachine(self) -> StateMechine:
1325 1326 1327 1328
        return self._stateMachine

    # def getState(self):
    #     return self._stateMachine.getCurrentState()
1329 1330 1331 1332 1333 1334

    # We aim to create a starting time tick, such that, whenever we run our test here once
    # We should be able to safely create 100,000 records, which will not have any repeated time stamp
    # when we re-run the test in 3 minutes (180 seconds), basically we should expand time duration
    # by a factor of 500.
    # TODO: what if it goes beyond 10 years into the future
1335
    # TODO: fix the error as result of above: "tsdb timestamp is out of range"
1336
    def setupLastTick(self):
1337
        t1 = datetime.datetime(2020, 6, 1)
1338
        t2 = datetime.datetime.now()
S
Shuduo Sang 已提交
1339 1340 1341 1342
        # maybe a very large number, takes 69 years to exceed Python int range
        elSec = int(t2.timestamp() - t1.timestamp())
        elSec2 = (elSec % (8 * 12 * 30 * 24 * 60 * 60 / 500)) * \
            500  # a number representing seconds within 10 years
1343
        # print("elSec = {}".format(elSec))
S
Shuduo Sang 已提交
1344 1345 1346
        t3 = datetime.datetime(2012, 1, 1)  # default "keep" is 10 years
        t4 = datetime.datetime.fromtimestamp(
            t3.timestamp() + elSec2)  # see explanation above
1347 1348 1349
        logger.info("Setting up TICKS to start from: {}".format(t4))
        return t4

S
Shuduo Sang 已提交
1350
    def pickAndAllocateTable(self):  # pick any table, and "use" it
S
Steven Li 已提交
1351 1352
        return self.tableNumQueue.pickAndAllocate()

1353 1354 1355 1356 1357
    def addTable(self):
        with self._lock:
            tIndex = self.tableNumQueue.push()
        return tIndex

1358 1359
    @classmethod
    def getFixedSuperTableName(cls):
1360
        return "fs_table"
1361

1362 1363 1364 1365
    @classmethod
    def getFixedSuperTable(cls):
        return TdSuperTable(cls.getFixedSuperTableName())

S
Shuduo Sang 已提交
1366
    def releaseTable(self, i):  # return the table back, so others can use it
S
Steven Li 已提交
1367 1368
        self.tableNumQueue.release(i)

1369
    def getNextTick(self):
S
Shuduo Sang 已提交
1370
        with self._lock:  # prevent duplicate tick
1371 1372
            if Dice.throw(20) == 0:  # 1 in 20 chance
                return self._lastTick + datetime.timedelta(0, -100) # Go back in time 100 seconds
S
Shuduo Sang 已提交
1373 1374 1375
            else:  # regular
                # add one second to it
                self._lastTick += datetime.timedelta(0, 1)
S
Steven Li 已提交
1376
                return self._lastTick
1377 1378

    def getNextInt(self):
1379 1380 1381
        with self._lock:
            self._lastInt += 1
            return self._lastInt
1382 1383

    def getNextBinary(self):
S
Shuduo Sang 已提交
1384 1385
        return "Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_{}".format(
            self.getNextInt())
1386 1387

    def getNextFloat(self):
1388 1389 1390
        ret = 0.9 + self.getNextInt()
        # print("Float obtained: {}".format(ret))
        return ret
S
Shuduo Sang 已提交
1391

S
Steven Li 已提交
1392
    def getTableNameToDelete(self):
S
Shuduo Sang 已提交
1393 1394
        tblNum = self.tableNumQueue.pop()  # TODO: race condition!
        if (not tblNum):  # maybe false
1395
            return False
S
Shuduo Sang 已提交
1396

S
Steven Li 已提交
1397 1398
        return "table_{}".format(tblNum)

1399
    def cleanUp(self):
S
Shuduo Sang 已提交
1400 1401
        self._dbConn.close()

1402

1403
class TaskExecutor():
1404
    class BoundedList:
S
Shuduo Sang 已提交
1405
        def __init__(self, size=10):
1406 1407
            self._size = size
            self._list = []
S
Steven Li 已提交
1408
            self._lock = threading.Lock()
1409

S
Shuduo Sang 已提交
1410
        def add(self, n: int):
S
Steven Li 已提交
1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436
            with self._lock:
                if not self._list:  # empty
                    self._list.append(n)
                    return
                # now we should insert
                nItems = len(self._list)
                insPos = 0
                for i in range(nItems):
                    insPos = i
                    if n <= self._list[i]:  # smaller than this item, time to insert
                        break  # found the insertion point
                    insPos += 1  # insert to the right

                if insPos == 0:  # except for the 1st item, # TODO: elimiate first item as gating item
                    return  # do nothing

                # print("Inserting at postion {}, value: {}".format(insPos, n))
                self._list.insert(insPos, n)  # insert

                newLen = len(self._list)
                if newLen <= self._size:
                    return  # do nothing
                elif newLen == (self._size + 1):
                    del self._list[0]  # remove the first item
                else:
                    raise RuntimeError("Corrupt Bounded List")
1437 1438 1439 1440 1441 1442

        def __str__(self):
            return repr(self._list)

    _boundedList = BoundedList()

1443 1444 1445
    def __init__(self, curStep):
        self._curStep = curStep

1446 1447 1448 1449
    @classmethod
    def getBoundedList(cls):
        return cls._boundedList

1450 1451 1452
    def getCurStep(self):
        return self._curStep

S
Shuduo Sang 已提交
1453
    def execute(self, task: Task, wt: WorkerThread):  # execute a task on a thread
1454
        task.execute(wt)
1455

1456 1457 1458 1459
    def recordDataMark(self, n: int):
        # print("[{}]".format(n), end="", flush=True)
        self._boundedList.add(n)

1460 1461
    # def logInfo(self, msg):
    #     logger.info("    T[{}.x]: ".format(self._curStep) + msg)
1462

1463 1464
    # def logDebug(self, msg):
    #     logger.debug("    T[{}.x]: ".format(self._curStep) + msg)
1465

S
Shuduo Sang 已提交
1466

S
Steven Li 已提交
1467
class Task():
1468 1469 1470 1471
    taskSn = 100

    @classmethod
    def allocTaskNum(cls):
S
Shuduo Sang 已提交
1472
        Task.taskSn += 1  # IMPORTANT: cannot use cls.taskSn, since each sub class will have a copy
S
Steven Li 已提交
1473 1474
        # logger.debug("Allocating taskSN: {}".format(Task.taskSn))
        return Task.taskSn
1475

S
Shuduo Sang 已提交
1476
    def __init__(self, dbManager: DbManager, execStats: ExecutionStats):
1477
        self._dbManager = dbManager
S
Shuduo Sang 已提交
1478
        self._workerThread = None
1479
        self._err = None
1480
        self._aborted = False
1481
        self._curStep = None
S
Shuduo Sang 已提交
1482
        self._numRows = None  # Number of rows affected
1483

S
Shuduo Sang 已提交
1484
        # Assign an incremental task serial number
1485
        self._taskNum = self.allocTaskNum()
S
Steven Li 已提交
1486
        # logger.debug("Creating new task {}...".format(self._taskNum))
1487

1488 1489
        self._execStats = execStats

1490
    def isSuccess(self):
S
Shuduo Sang 已提交
1491
        return self._err is None
1492

1493 1494 1495
    def isAborted(self):
        return self._aborted

S
Shuduo Sang 已提交
1496
    def clone(self):  # TODO: why do we need this again?
1497
        newTask = self.__class__(self._dbManager, self._execStats)
1498 1499 1500
        return newTask

    def logDebug(self, msg):
S
Shuduo Sang 已提交
1501 1502 1503
        self._workerThread.logDebug(
            "Step[{}.{}] {}".format(
                self._curStep, self._taskNum, msg))
1504 1505

    def logInfo(self, msg):
S
Shuduo Sang 已提交
1506 1507 1508
        self._workerThread.logInfo(
            "Step[{}.{}] {}".format(
                self._curStep, self._taskNum, msg))
1509

1510
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
S
Shuduo Sang 已提交
1511 1512 1513
        raise RuntimeError(
            "To be implemeted by child classes, class name: {}".format(
                self.__class__.__name__))
1514

1515 1516 1517 1518
    def _isErrAcceptable(self, errno, msg):
        if errno in [
                0x05,  # TSDB_CODE_RPC_NOT_READY
                # 0x200, # invalid SQL, TODO: re-examine with TD-934
1519 1520
                0x217, # "db not selected", client side defined error code
                0x218, # "Table does not exist" client side defined error code
1521 1522 1523 1524 1525 1526 1527 1528 1529
                0x360, 0x362, 
                0x369, # tag already exists
                0x36A, 0x36B, 0x36D,
                0x381, 
                0x380, # "db not selected"
                0x383,
                0x386,  # DB is being dropped?!
                0x503,
                0x510,  # vnode not in ready state
1530
                0x14,   # db not ready, errno changed
1531 1532 1533 1534
                0x600,
                1000  # REST catch-all error
            ]: 
            return True # These are the ALWAYS-ACCEPTABLE ones
1535 1536
        elif (errno in [ 0x0B ]) and gConfig.auto_start_service:
            return True # We may get "network unavilable" when restarting service
1537 1538 1539
        elif errno == 0x200 : # invalid SQL, we need to div in a bit more
            if msg.find("invalid column name") != -1:
                return True 
1540 1541 1542 1543
            elif msg.find("tags number not matched") != -1: # mismatched tags after modification
                return True
            elif msg.find("duplicated column names") != -1: # also alter table tag issues
                return True
S
Steven Li 已提交
1544 1545 1546
        elif (gSvcMgr!=None) and gSvcMgr.isRestarting():
            logger.info("Ignoring error when service is restarting: errno = {}, msg = {}".format(errno, msg))
            return True
1547 1548 1549 1550
        
        return False # Not an acceptable error


1551 1552
    def execute(self, wt: WorkerThread):
        wt.verifyThreadSelf()
S
Shuduo Sang 已提交
1553
        self._workerThread = wt  # type: ignore
1554 1555

        te = wt.getTaskExecutor()
1556
        self._curStep = te.getCurStep()
S
Shuduo Sang 已提交
1557 1558
        self.logDebug(
            "[-] executing task {}...".format(self.__class__.__name__))
1559 1560

        self._err = None
1561 1562
        self._execStats.beginTaskType(self.__class__.__name__)  # mark beginning
        errno2 = None
1563
        try:
S
Shuduo Sang 已提交
1564
            self._executeInternal(te, wt)  # TODO: no return value?
1565
        except taos.error.ProgrammingError as err:
1566
            errno2 = Helper.convertErrno(err.errno)
1567
            if (gConfig.continue_on_exception):  # user choose to continue
1568
                self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format(
1569
                        errno2, err, wt.getDbConn().getLastSql()))
1570
                self._err = err
1571 1572
            elif self._isErrAcceptable(errno2, err.__str__()):
                self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
1573
                        errno2, err, wt.getDbConn().getLastSql()))
1574
                print("_", end="", flush=True)
S
Shuduo Sang 已提交
1575
                self._err = err
1576
            else: # not an acceptable error
1577 1578 1579
                errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, msg: {}, SQL: {}".format(
                    self.__class__.__name__,
                    errno2, err, wt.getDbConn().getLastSql())
1580
                self.logDebug(errMsg)
S
Shuduo Sang 已提交
1581
                if gConfig.debug:
1582 1583
                    # raise # so that we see full stack
                    traceback.print_exc()
1584 1585
                print(
                    "\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) +
1586 1587 1588 1589
                    "----------------------------\n")
                # sys.exit(-1)
                self._err = err
                self._aborted = True
S
Shuduo Sang 已提交
1590
        except Exception as e:
S
Steven Li 已提交
1591
            self.logInfo("Non-TAOS exception encountered")
S
Shuduo Sang 已提交
1592
            self._err = e
S
Steven Li 已提交
1593
            self._aborted = True
1594
            traceback.print_exc()
1595
        except BaseException as e:
1596
            self.logInfo("Python base exception encountered")
1597
            self._err = e
1598
            self._aborted = True
S
Steven Li 已提交
1599
            traceback.print_exc()
S
Shuduo Sang 已提交
1600 1601 1602
        except BaseException:
            self.logDebug(
                "[=] Unexpected exception, SQL: {}".format(
1603
                    wt.getDbConn().getLastSql()))
1604
            raise
1605
        self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
S
Shuduo Sang 已提交
1606 1607 1608 1609

        self.logDebug("[X] task execution completed, {}, status: {}".format(
            self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))
        # TODO: merge with above.
1610
        self._execStats.incExecCount(self.__class__.__name__, self.isSuccess(), errno2)
S
Steven Li 已提交
1611

1612
    def execSql(self, sql):
1613
        return self._dbManager.execute(sql)
1614

S
Shuduo Sang 已提交
1615
    def execWtSql(self, wt: WorkerThread, sql):  # execute an SQL on the worker thread
1616 1617
        return wt.execSql(sql)

S
Shuduo Sang 已提交
1618
    def queryWtSql(self, wt: WorkerThread, sql):  # execute an SQL on the worker thread
1619 1620
        return wt.querySql(sql)

S
Shuduo Sang 已提交
1621
    def getQueryResult(self, wt: WorkerThread):  # execute an SQL on the worker thread
1622 1623 1624
        return wt.getQueryResult()


1625
class ExecutionStats:
1626
    def __init__(self):
S
Shuduo Sang 已提交
1627 1628
        # total/success times for a task
        self._execTimes: Dict[str, [int, int]] = {}
1629 1630 1631
        self._tasksInProgress = 0
        self._lock = threading.Lock()
        self._firstTaskStartTime = None
1632
        self._execStartTime = None
1633
        self._errors = {}
S
Shuduo Sang 已提交
1634 1635
        self._elapsedTime = 0.0  # total elapsed time
        self._accRunTime = 0.0  # accumulated run time
1636

1637 1638 1639
        self._failed = False
        self._failureReason = None

S
Steven Li 已提交
1640
    def __str__(self):
S
Shuduo Sang 已提交
1641 1642
        return "[ExecStats: _failed={}, _failureReason={}".format(
            self._failed, self._failureReason)
S
Steven Li 已提交
1643 1644

    def isFailed(self):
S
Shuduo Sang 已提交
1645
        return self._failed
S
Steven Li 已提交
1646

1647 1648 1649 1650 1651 1652
    def startExec(self):
        self._execStartTime = time.time()

    def endExec(self):
        self._elapsedTime = time.time() - self._execStartTime

1653
    def incExecCount(self, klassName, isSuccess, eno=None):  # TODO: add a lock here
1654 1655
        if klassName not in self._execTimes:
            self._execTimes[klassName] = [0, 0]
S
Shuduo Sang 已提交
1656 1657
        t = self._execTimes[klassName]  # tuple for the data
        t[0] += 1  # index 0 has the "total" execution times
1658
        if isSuccess:
S
Shuduo Sang 已提交
1659
            t[1] += 1  # index 1 has the "success" execution times
1660 1661 1662 1663 1664
        if eno != None:             
            if klassName not in self._errors:
                self._errors[klassName] = {}
            errors = self._errors[klassName]
            errors[eno] = errors[eno]+1 if eno in errors else 1
1665 1666 1667

    def beginTaskType(self, klassName):
        with self._lock:
S
Shuduo Sang 已提交
1668 1669
            if self._tasksInProgress == 0:  # starting a new round
                self._firstTaskStartTime = time.time()  # I am now the first task
1670 1671 1672 1673 1674
            self._tasksInProgress += 1

    def endTaskType(self, klassName, isSuccess):
        with self._lock:
            self._tasksInProgress -= 1
S
Shuduo Sang 已提交
1675
            if self._tasksInProgress == 0:  # all tasks have stopped
1676 1677 1678
                self._accRunTime += (time.time() - self._firstTaskStartTime)
                self._firstTaskStartTime = None

1679 1680 1681 1682
    def registerFailure(self, reason):
        self._failed = True
        self._failureReason = reason

1683
    def printStats(self):
S
Shuduo Sang 已提交
1684 1685 1686 1687 1688 1689
        logger.info(
            "----------------------------------------------------------------------")
        logger.info(
            "| Crash_Gen test {}, with the following stats:". format(
                "FAILED (reason: {})".format(
                    self._failureReason) if self._failed else "SUCCEEDED"))
1690
        logger.info("| Task Execution Times (success/total):")
1691
        execTimesAny = 0
S
Shuduo Sang 已提交
1692
        for k, n in self._execTimes.items():
1693
            execTimesAny += n[0]
1694 1695 1696 1697 1698 1699 1700 1701
            errStr = None
            if k in self._errors:
                errors = self._errors[k]
                # print("errors = {}".format(errors))
                errStrs = ["0x{:X}:{}".format(eno, n) for (eno, n) in errors.items()]
                # print("error strings = {}".format(errStrs))
                errStr = ", ".join(errStrs) 
            logger.info("|    {0:<24}: {1}/{2} (Errors: {3})".format(k, n[1], n[0], errStr))
S
Shuduo Sang 已提交
1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715

        logger.info(
            "| Total Tasks Executed (success or not): {} ".format(execTimesAny))
        logger.info(
            "| Total Tasks In Progress at End: {}".format(
                self._tasksInProgress))
        logger.info(
            "| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(
                self._accRunTime))
        logger.info(
            "| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime / execTimesAny))
        logger.info(
            "| Total Elapsed Time (from wall clock): {:.3f} seconds".format(
                self._elapsedTime))
1716 1717
        logger.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList()))
        logger.info("| Total Number of Active DB Native Connections: {}".format(DbConnNative.totalConnections))
S
Shuduo Sang 已提交
1718 1719
        logger.info(
            "----------------------------------------------------------------------")
1720 1721 1722


class StateTransitionTask(Task):
1723 1724 1725 1726 1727
    LARGE_NUMBER_OF_TABLES = 35
    SMALL_NUMBER_OF_TABLES = 3
    LARGE_NUMBER_OF_RECORDS = 50
    SMALL_NUMBER_OF_RECORDS = 3

1728
    @classmethod
S
Shuduo Sang 已提交
1729
    def getInfo(cls):  # each sub class should supply their own information
1730 1731
        raise RuntimeError("Overriding method expected")

S
Shuduo Sang 已提交
1732
    _endState = None
1733
    @classmethod
S
Shuduo Sang 已提交
1734
    def getEndState(cls):  # TODO: optimize by calling it fewer times
1735 1736
        raise RuntimeError("Overriding method expected")

1737 1738 1739
    # @classmethod
    # def getBeginStates(cls):
    #     return cls.getInfo()[0]
1740

1741 1742 1743
    # @classmethod
    # def getEndState(cls): # returning the class name
    #     return cls.getInfo()[0]
1744 1745

    @classmethod
1746 1747 1748
    def canBeginFrom(cls, state: AnyState):
        # return state.getValue() in cls.getBeginStates()
        raise RuntimeError("must be overriden")
1749

1750 1751
    @classmethod
    def getRegTableName(cls, i):
1752
        return "reg_table_{}".format(i)
1753

1754 1755
    def execute(self, wt: WorkerThread):
        super().execute(wt)
S
Shuduo Sang 已提交
1756 1757


1758
class TaskCreateDb(StateTransitionTask):
1759
    @classmethod
1760
    def getEndState(cls):
S
Shuduo Sang 已提交
1761
        return StateDbOnly()
1762

1763 1764 1765 1766
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canCreateDb()

1767
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1768 1769
        # self.execWtSql(wt, "create database db replica {}".format(Dice.throw(3)+1))
        self.execWtSql(wt, "create database db")
1770

1771
class TaskDropDb(StateTransitionTask):
1772
    @classmethod
1773 1774
    def getEndState(cls):
        return StateEmpty()
1775

1776 1777 1778 1779
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canDropDb()

1780
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1781
        self.execWtSql(wt, "drop database db")
S
Steven Li 已提交
1782
        logger.debug("[OPS] database dropped at {}".format(time.time()))
1783

S
Shuduo Sang 已提交
1784

1785
class TaskCreateSuperTable(StateTransitionTask):
1786
    @classmethod
1787 1788
    def getEndState(cls):
        return StateSuperTableOnly()
1789

1790 1791
    @classmethod
    def canBeginFrom(cls, state: AnyState):
1792
        return state.canCreateFixedSuperTable()
1793

1794
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
S
Shuduo Sang 已提交
1795
        if not wt.dbInUse():  # no DB yet, to the best of our knowledge
1796 1797 1798
            logger.debug("Skipping task, no DB yet")
            return

1799
        sTable = self._dbManager.getFixedSuperTable()
1800
        # wt.execSql("use db")    # should always be in place
1801 1802
        sTable.create(wt.getDbConn(), {'ts':'timestamp', 'speed':'int'}, {'b':'binary(200)', 'f':'float'})
        # self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
S
Shuduo Sang 已提交
1803 1804
        # No need to create the regular tables, INSERT will do that
        # automatically
1805

S
Steven Li 已提交
1806

1807 1808 1809 1810
class TdSuperTable:
    def __init__(self, stName):
        self._stName = stName

1811 1812 1813
    def getName(self):
        return self._stName

1814 1815 1816 1817 1818 1819 1820 1821
    def create(self, dbc, cols: dict, tags: dict):
        sql = "CREATE TABLE db.{} ({}) TAGS ({})".format(
            self._stName,
            ",".join(['%s %s'%(k,v) for (k,v) in cols.items()]),
            ",".join(['%s %s'%(k,v) for (k,v) in tags.items()])
            )
        dbc.execute(sql)        

1822 1823 1824 1825
    def getRegTables(self, dbc: DbConn):
        try:
            dbc.query("select TBNAME from db.{}".format(self._stName))  # TODO: analyze result set later            
        except taos.error.ProgrammingError as err:                    
1826
            errno2 = Helper.convertErrno(err.errno) 
1827 1828 1829 1830 1831 1832 1833 1834 1835 1836
            logger.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err))
            raise

        qr = dbc.getQueryResult()
        return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation

    def hasRegTables(self, dbc: DbConn):
        return dbc.query("SELECT * FROM db.{}".format(self._stName)) > 0

    def ensureTable(self, dbc: DbConn, regTableName: str):
1837
        sql = "select tbname from db.{} where tbname in ('{}')".format(self._stName, regTableName)
1838 1839
        if dbc.query(sql) >= 1 : # reg table exists already
            return
1840 1841
        sql = "CREATE TABLE {} USING {} tags ({})".format(
            regTableName, self._stName, self._getTagStrForSql(dbc)
1842 1843 1844
        )
        dbc.execute(sql)

1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889
    def _getTagStrForSql(self, dbc) :
        tags = self._getTags(dbc)
        tagStrs = []
        for tagName in tags: 
            tagType = tags[tagName]
            if tagType == 'BINARY':
                tagStrs.append("'Beijing-Shanghai-LosAngeles'")
            elif tagType == 'FLOAT':
                tagStrs.append('9.9')
            elif tagType == 'INT':
                tagStrs.append('88')
            else:
                raise RuntimeError("Unexpected tag type: {}".format(tagType))
        return ", ".join(tagStrs)

    def _getTags(self, dbc) -> dict:
        dbc.query("DESCRIBE {}".format(self._stName))
        stCols = dbc.getQueryResult()
        # print(stCols)
        ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type
        # print("Tags retrieved: {}".format(ret))
        return ret

    def addTag(self, dbc, tagName, tagType):
        if tagName in self._getTags(dbc): # already 
            return
        # sTable.addTag("extraTag", "int")
        sql = "alter table db.{} add tag {} {}".format(self._stName, tagName, tagType)
        dbc.execute(sql)

    def dropTag(self, dbc, tagName):
        if not tagName in self._getTags(dbc): # don't have this tag
            return
        sql = "alter table db.{} drop tag {}".format(self._stName, tagName)
        dbc.execute(sql)

    def changeTag(self, dbc, oldTag, newTag):
        tags = self._getTags(dbc)
        if not oldTag in tags: # don't have this tag
            return
        if newTag in tags: # already have this tag
            return
        sql = "alter table db.{} change tag {} {}".format(self._stName, oldTag, newTag)
        dbc.execute(sql)

1890
class TaskReadData(StateTransitionTask):
1891
    @classmethod
1892
    def getEndState(cls):
S
Shuduo Sang 已提交
1893
        return None  # meaning doesn't affect state
1894

1895 1896 1897 1898
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canReadData()

1899
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1900
        sTable = self._dbManager.getFixedSuperTable()
1901

S
Shuduo Sang 已提交
1902 1903
        if random.randrange(
                5) == 0:  # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations
1904 1905
            wt.getDbConn().close()
            wt.getDbConn().open()
1906
        
1907 1908 1909 1910 1911 1912
        dbc = wt.getDbConn()
        for rTbName in sTable.getRegTables(dbc):  # regular tables
            aggExpr = Dice.choice([
                '*', 
                'count(*)', 
                'avg(speed)', 
1913
                # 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
1914 1915
                'sum(speed)', 
                'stddev(speed)', 
1916
                # SELECTOR functions
1917 1918 1919
                'min(speed)', 
                'max(speed)', 
                'first(speed)', 
1920
                'last(speed)',
1921 1922 1923
                'top(speed, 50)', # TODO: not supported?
                'bottom(speed, 50)', # TODO: not supported?
                'apercentile(speed, 10)', # TODO: TD-1316
1924 1925 1926 1927 1928
                'last_row(speed)',
                # Transformation Functions
                # 'diff(speed)', # TODO: no supported?!
                'spread(speed)'
                ]) # TODO: add more from 'top'
1929 1930 1931
            filterExpr = Dice.choice([ # TODO: add various kind of WHERE conditions
                None
            ])
1932
            try:
1933
                # Run the query against the regular table first
1934
                dbc.execute("select {} from db.{}".format(aggExpr, rTbName))
1935
                # Then run it against the super table
1936 1937
                if aggExpr not in ['stddev(speed)']: #TODO: STDDEV not valid for super tables?!
                    dbc.execute("select {} from db.{}".format(aggExpr, sTable.getName()))
1938
            except taos.error.ProgrammingError as err:                    
1939
                errno2 = Helper.convertErrno(err.errno)
1940
                logger.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql()))
1941
                raise
S
Shuduo Sang 已提交
1942

1943
class TaskDropSuperTable(StateTransitionTask):
1944
    @classmethod
1945
    def getEndState(cls):
S
Shuduo Sang 已提交
1946
        return StateDbOnly()
1947

1948 1949
    @classmethod
    def canBeginFrom(cls, state: AnyState):
1950
        return state.canDropFixedSuperTable()
1951

1952
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
S
Shuduo Sang 已提交
1953 1954 1955 1956 1957 1958 1959
        # 1/2 chance, we'll drop the regular tables one by one, in a randomized
        # sequence
        if Dice.throw(2) == 0:
            tblSeq = list(range(
                2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)))
            random.shuffle(tblSeq)
            tickOutput = False  # if we have spitted out a "d" character for "drop regular table"
1960
            isSuccess = True
S
Shuduo Sang 已提交
1961 1962 1963
            for i in tblSeq:
                regTableName = self.getRegTableName(
                    i)  # "db.reg_table_{}".format(i)
1964
                try:
S
Shuduo Sang 已提交
1965 1966 1967
                    self.execWtSql(wt, "drop table {}".format(
                        regTableName))  # nRows always 0, like MySQL
                except taos.error.ProgrammingError as err:
1968 1969
                    # correcting for strange error number scheme                    
                    errno2 = Helper.convertErrno(err.errno)
S
Shuduo Sang 已提交
1970
                    if (errno2 in [0x362]):  # mnode invalid table name
1971
                        isSuccess = False
S
Shuduo Sang 已提交
1972 1973 1974
                        logger.debug(
                            "[DB] Acceptable error when dropping a table")
                    continue  # try to delete next regular table
1975 1976

                if (not tickOutput):
S
Shuduo Sang 已提交
1977 1978
                    tickOutput = True  # Print only one time
                    if isSuccess:
1979 1980
                        print("d", end="", flush=True)
                    else:
S
Shuduo Sang 已提交
1981
                        print("f", end="", flush=True)
1982 1983

        # Drop the super table itself
S
Shuduo Sang 已提交
1984
        tblName = self._dbManager.getFixedSuperTableName()
1985
        self.execWtSql(wt, "drop table db.{}".format(tblName))
1986

S
Shuduo Sang 已提交
1987

1988 1989 1990
class TaskAlterTags(StateTransitionTask):
    @classmethod
    def getEndState(cls):
S
Shuduo Sang 已提交
1991
        return None  # meaning doesn't affect state
1992 1993 1994

    @classmethod
    def canBeginFrom(cls, state: AnyState):
S
Shuduo Sang 已提交
1995
        return state.canDropFixedSuperTable()  # if we can drop it, we can alter tags
1996 1997

    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1998 1999 2000
        # tblName = self._dbManager.getFixedSuperTableName()
        dbc = wt.getDbConn()
        sTable = self._dbManager.getFixedSuperTable()
2001
        dice = Dice.throw(4)
S
Shuduo Sang 已提交
2002
        if dice == 0:
2003 2004
            sTable.addTag(dbc, "extraTag", "int")
            # sql = "alter table db.{} add tag extraTag int".format(tblName)
S
Shuduo Sang 已提交
2005
        elif dice == 1:
2006 2007
            sTable.dropTag(dbc, "extraTag")
            # sql = "alter table db.{} drop tag extraTag".format(tblName)
S
Shuduo Sang 已提交
2008
        elif dice == 2:
2009 2010
            sTable.dropTag(dbc, "newTag")
            # sql = "alter table db.{} drop tag newTag".format(tblName)
S
Shuduo Sang 已提交
2011
        else:  # dice == 3
2012 2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025 2026 2027 2028
            sTable.changeTag(dbc, "extraTag", "newTag")
            # sql = "alter table db.{} change tag extraTag newTag".format(tblName)

class TaskRestartService(StateTransitionTask):
    _isRunning = False
    _classLock = threading.Lock()

    @classmethod
    def getEndState(cls):
        return None  # meaning doesn't affect state

    @classmethod
    def canBeginFrom(cls, state: AnyState):
        if gConfig.auto_start_service:
            return state.canDropFixedSuperTable()  # Basicallly when we have the super table
        return False # don't run this otherwise

2029
    CHANCE_TO_RESTART_SERVICE = 200
2030 2031 2032 2033
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        if not gConfig.auto_start_service: # only execute when we are in -a mode
            print("_a", end="", flush=True)
            return
2034

2035 2036 2037 2038 2039 2040
        with self._classLock:
            if self._isRunning:
                print("Skipping restart task, another running already")
                return
            self._isRunning = True

2041
        if Dice.throw(self.CHANCE_TO_RESTART_SERVICE) == 0: # 1 in N chance
2042 2043 2044
            dbc = wt.getDbConn()
            dbc.execute("show databases") # simple delay, align timing with other workers
            gSvcMgr.restart()
2045

2046
        self._isRunning = False
S
Shuduo Sang 已提交
2047

2048
class TaskAddData(StateTransitionTask):
S
Shuduo Sang 已提交
2049 2050
    # Track which table is being actively worked on
    activeTable: Set[int] = set()
2051

S
Shuduo Sang 已提交
2052 2053
    # We use these two files to record operations to DB, useful for power-off
    # tests
2054 2055 2056 2057 2058
    fAddLogReady = None
    fAddLogDone = None

    @classmethod
    def prepToRecordOps(cls):
S
Shuduo Sang 已提交
2059 2060 2061 2062
        if gConfig.record_ops:
            if (cls.fAddLogReady is None):
                logger.info(
                    "Recording in a file operations to be performed...")
2063
                cls.fAddLogReady = open("add_log_ready.txt", "w")
S
Shuduo Sang 已提交
2064
            if (cls.fAddLogDone is None):
2065 2066
                logger.info("Recording in a file operations completed...")
                cls.fAddLogDone = open("add_log_done.txt", "w")
2067

2068
    @classmethod
2069 2070
    def getEndState(cls):
        return StateHasData()
2071 2072 2073 2074

    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canAddData()
S
Shuduo Sang 已提交
2075

2076
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
2077
        ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access
2078
        tblSeq = list(range(
S
Shuduo Sang 已提交
2079 2080 2081 2082
                self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))
        random.shuffle(tblSeq)
        for i in tblSeq:
            if (i in self.activeTable):  # wow already active
2083
                print("x", end="", flush=True) # concurrent insertion
2084
            else:
S
Shuduo Sang 已提交
2085
                self.activeTable.add(i)  # marking it active
2086 2087 2088
            
            sTable = ds.getFixedSuperTable()
            regTableName = self.getRegTableName(i)  # "db.reg_table_{}".format(i)
2089
            sTable.ensureTable(wt.getDbConn(), regTableName)  # Ensure the table exists           
2090 2091
           
            for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS):  # number of records per table
S
Shuduo Sang 已提交
2092
                nextInt = ds.getNextInt()
2093 2094
                if gConfig.record_ops:
                    self.prepToRecordOps()
2095
                    self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
2096 2097
                    self.fAddLogReady.flush()
                    os.fsync(self.fAddLogReady)
2098
                sql = "insert into {} values ('{}', {});".format( # removed: tags ('{}', {})
S
Shuduo Sang 已提交
2099
                    regTableName,
2100 2101
                    # ds.getFixedSuperTableName(),
                    # ds.getNextBinary(), ds.getNextFloat(),
2102
                    ds.getNextTick(), nextInt)
S
Shuduo Sang 已提交
2103 2104 2105
                self.execWtSql(wt, sql)
                # Successfully wrote the data into the DB, let's record it
                # somehow
2106
                te.recordDataMark(nextInt)
2107
                if gConfig.record_ops:
S
Shuduo Sang 已提交
2108 2109 2110
                    self.fAddLogDone.write(
                        "Wrote {} to {}\n".format(
                            nextInt, regTableName))
2111 2112
                    self.fAddLogDone.flush()
                    os.fsync(self.fAddLogDone)
S
Shuduo Sang 已提交
2113
            self.activeTable.discard(i)  # not raising an error, unlike remove
2114 2115


S
Steven Li 已提交
2116 2117
# Deterministic random number generator
class Dice():
S
Shuduo Sang 已提交
2118
    seeded = False  # static, uninitialized
S
Steven Li 已提交
2119 2120

    @classmethod
S
Shuduo Sang 已提交
2121
    def seed(cls, s):  # static
S
Steven Li 已提交
2122
        if (cls.seeded):
S
Shuduo Sang 已提交
2123 2124
            raise RuntimeError(
                "Cannot seed the random generator more than once")
S
Steven Li 已提交
2125 2126 2127 2128 2129
        cls.verifyRNG()
        random.seed(s)
        cls.seeded = True  # TODO: protect against multi-threading

    @classmethod
S
Shuduo Sang 已提交
2130
    def verifyRNG(cls):  # Verify that the RNG is determinstic
S
Steven Li 已提交
2131 2132 2133 2134
        random.seed(0)
        x1 = random.randrange(0, 1000)
        x2 = random.randrange(0, 1000)
        x3 = random.randrange(0, 1000)
S
Shuduo Sang 已提交
2135
        if (x1 != 864 or x2 != 394 or x3 != 776):
S
Steven Li 已提交
2136 2137 2138
            raise RuntimeError("System RNG is not deterministic")

    @classmethod
S
Shuduo Sang 已提交
2139
    def throw(cls, stop):  # get 0 to stop-1
2140
        return cls.throwRange(0, stop)
S
Steven Li 已提交
2141 2142

    @classmethod
S
Shuduo Sang 已提交
2143 2144
    def throwRange(cls, start, stop):  # up to stop-1
        if (not cls.seeded):
S
Steven Li 已提交
2145
            raise RuntimeError("Cannot throw dice before seeding it")
2146
        return random.randrange(start, stop)
S
Steven Li 已提交
2147

2148 2149 2150 2151
    @classmethod
    def choice(cls, cList):
        return random.choice(cList)

S
Steven Li 已提交
2152

S
Steven Li 已提交
2153 2154
class LoggingFilter(logging.Filter):
    def filter(self, record: logging.LogRecord):
S
Shuduo Sang 已提交
2155 2156
        if (record.levelno >= logging.INFO):
            return True  # info or above always log
S
Steven Li 已提交
2157

S
Steven Li 已提交
2158 2159 2160 2161
        # Commenting out below to adjust...

        # if msg.startswith("[TRD]"):
        #     return False
S
Steven Li 已提交
2162 2163
        return True

S
Shuduo Sang 已提交
2164 2165

class MyLoggingAdapter(logging.LoggerAdapter):
2166 2167 2168 2169
    def process(self, msg, kwargs):
        return "[{}]{}".format(threading.get_ident() % 10000, msg), kwargs
        # return '[%s] %s' % (self.extra['connid'], msg), kwargs

S
Shuduo Sang 已提交
2170 2171

class SvcManager:
2172
    def __init__(self):
2173
        print("Starting TDengine Service Manager")
2174 2175 2176
        # signal.signal(signal.SIGTERM, self.sigIntHandler) # Moved to MainExec
        # signal.signal(signal.SIGINT, self.sigIntHandler)
        # signal.signal(signal.SIGUSR1, self.sigUsrHandler)  # different handler!
2177

2178
        self.inSigHandler = False
2179 2180
        # self._status = MainExec.STATUS_RUNNING # set inside
        # _startTaosService()
2181
        self.svcMgrThread = None
2182 2183
        self._lock = threading.Lock()
        self._isRestarting = False
2184

2185 2186 2187 2188 2189 2190 2191 2192
    def _doMenu(self):
        choice = ""
        while True:
            print("\nInterrupting Service Program, Choose an Action: ")
            print("1: Resume")
            print("2: Terminate")
            print("3: Restart")
            # Remember to update the if range below
S
Shuduo Sang 已提交
2193
            # print("Enter Choice: ", end="", flush=True)
2194 2195 2196
            while choice == "":
                choice = input("Enter Choice: ")
                if choice != "":
S
Shuduo Sang 已提交
2197 2198 2199
                    break  # done with reading repeated input
            if choice in ["1", "2", "3"]:
                break  # we are done with whole method
2200
            print("Invalid choice, please try again.")
S
Shuduo Sang 已提交
2201
            choice = ""  # reset
2202 2203
        return choice

S
Shuduo Sang 已提交
2204
    def sigUsrHandler(self, signalNumber, frame):
2205
        print("Interrupting main thread execution upon SIGUSR1")
2206
        if self.inSigHandler:  # already
2207
            print("Ignoring repeated SIG...")
S
Shuduo Sang 已提交
2208
            return  # do nothing if it's already not running
2209
        self.inSigHandler = True
2210 2211

        choice = self._doMenu()
S
Shuduo Sang 已提交
2212 2213 2214 2215 2216
        if choice == "1":
            # TODO: can the sub-process be blocked due to us not reading from
            # queue?
            self.sigHandlerResume()
        elif choice == "2":
2217
            self.stopTaosService()
2218 2219
        elif choice == "3": # Restart
            self.restart()
2220 2221
        else:
            raise RuntimeError("Invalid menu choice: {}".format(choice))
2222

2223 2224
        self.inSigHandler = False

2225
    def sigIntHandler(self, signalNumber, frame):
2226
        print("SvcManager: INT Signal Handler starting...")
2227
        if self.inSigHandler:
2228 2229
            print("Ignoring repeated SIG_INT...")
            return
2230
        self.inSigHandler = True
2231

S
Shuduo Sang 已提交
2232
        self.stopTaosService()
2233
        print("SvcManager: INT Signal Handler returning...")
2234
        self.inSigHandler = False
2235

S
Shuduo Sang 已提交
2236
    def sigHandlerResume(self):
2237
        print("Resuming TDengine service manager thread (main thread)...\n\n")
2238

2239
    def _checkServiceManagerThread(self):
2240 2241 2242 2243
        if self.svcMgrThread:  # valid svc mgr thread
            if self.svcMgrThread.isStopped():  # done?
                self.svcMgrThread.procIpcBatch()  # one last time. TODO: appropriate?
                self.svcMgrThread = None  # no more
2244 2245

    def _procIpcAll(self):
2246 2247 2248 2249 2250 2251
        while self.isRunning() or self.isRestarting() :  # for as long as the svc mgr thread is still here
            if self.isRunning():
                self.svcMgrThread.procIpcBatch()  # regular processing,
                self._checkServiceManagerThread()
            elif self.isRetarting():
                print("Service restarting...")
2252 2253 2254 2255 2256
            time.sleep(0.5)  # pause, before next round
        print(
            "Service Manager Thread (with subprocess) has ended, main thread now exiting...")

    def startTaosService(self):
2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269
        with self._lock:
            if self.svcMgrThread:
                raise RuntimeError("Cannot start TAOS service when one may already be running")

            # Find if there's already a taosd service, and then kill it
            for proc in psutil.process_iter():
                if proc.name() == 'taosd':
                    print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupe")
                    time.sleep(2.0)
                    proc.kill()
                # print("Process: {}".format(proc.name()))
            
            self.svcMgrThread = ServiceManagerThread()  # create the object
S
Steven Li 已提交
2270
            print("Attempting to start TAOS service started, printing out output...")
2271
            self.svcMgrThread.start()            
2272 2273 2274 2275
            self.svcMgrThread.procIpcBatch(
                trimToTarget=10,
                forceOutput=True)  # for printing 10 lines
            print("TAOS service started")
2276 2277

    def stopTaosService(self, outputLines=20):
2278 2279 2280 2281
        with self._lock:
            if not self.isRunning():
                logger.warning("Cannot stop TAOS service, not running")
                return
2282

2283 2284 2285 2286 2287
            print("Terminating Service Manager Thread (SMT) execution...")
            self.svcMgrThread.stop()
            if self.svcMgrThread.isStopped():
                self.svcMgrThread.procIpcBatch(outputLines)  # one last time
                self.svcMgrThread = None
2288 2289
                print("End of TDengine Service Output")
                print("----- TDengine Service (managed by SMT) is now terminated -----\n")
2290 2291
            else:
                print("WARNING: SMT did not terminate as expected")
2292 2293 2294

    def run(self):
        self.startTaosService()
2295
        self._procIpcAll()  # pump/process all the messages, may encounter SIG + restart
2296
        if self.isRunning():  # if sig handler hasn't destroyed it by now
2297 2298
            self.stopTaosService()  # should have started already

2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312
    def restart(self):
        if self._isRestarting:
            logger.warning("Cannot restart service when it's already restarting")
            return

        self._isRestarting = True
        if self.isRunning():
            self.stopTaosService()
        else:
            logger.warning("Service not running when restart requested")

        self.startTaosService()
        self._isRestarting = False

2313 2314
    def isRunning(self):
        return self.svcMgrThread != None
2315

2316 2317 2318
    def isRestarting(self):
        return self._isRestarting

2319 2320 2321 2322 2323
class ServiceManagerThread:
    MAX_QUEUE_SIZE = 10000

    def __init__(self):
        self._tdeSubProcess = None
2324
        self._thread = None
2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341
        self._status = None

    def getStatus(self):
        return self._status

    def isRunning(self):
        # return self._thread and self._thread.is_alive()
        return self._status == MainExec.STATUS_RUNNING

    def isStopping(self):
        return self._status == MainExec.STATUS_STOPPING

    def isStopped(self):
        return self._status == MainExec.STATUS_STOPPED

    # Start the thread (with sub process), and wait for the sub service
    # to become fully operational
2342 2343
    def start(self):
        if self._thread:
2344
            raise RuntimeError("Unexpected _thread")
2345
        if self._tdeSubProcess:
2346 2347 2348 2349
            raise RuntimeError("TDengine sub process already created/running")

        self._status = MainExec.STATUS_STARTING

2350
        self._tdeSubProcess = TdeSubProcess()
2351 2352 2353 2354
        self._tdeSubProcess.start()

        self._ipcQueue = Queue()
        self._thread = threading.Thread(
2355
            target=self.svcOutputReader,
2356
            args=(self._tdeSubProcess.getStdOut(), self._ipcQueue))
2357
        self._thread.daemon = True  # thread dies with the program
2358 2359
        self._thread.start()

2360 2361 2362 2363 2364 2365
        self._thread2 = threading.Thread(
            target=self.svcErrorReader,
            args=(self._tdeSubProcess.getStdErr(), self._ipcQueue))
        self._thread2.daemon = True  # thread dies with the program
        self._thread2.start()

2366
        # wait for service to start
R
root 已提交
2367
        for i in range(0, 100):
2368 2369 2370
            time.sleep(1.0)
            # self.procIpcBatch() # don't pump message during start up
            print("_zz_", end="", flush=True)
2371
            if self._status == MainExec.STATUS_RUNNING:
2372
                logger.info("[] TDengine service READY to process requests")
2373 2374
                return  # now we've started
        # TODO: handle this better?
R
root 已提交
2375
        self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output
2376
        raise RuntimeError("TDengine service did not start successfully")
2377 2378 2379 2380 2381 2382

    def stop(self):
        # can be called from both main thread or signal handler
        print("Terminating TDengine service running as the sub process...")
        if self.isStopped():
            print("Service already stopped")
2383
            return
2384 2385 2386
        if self.isStopping():
            print("Service is already being stopped")
            return
2387 2388 2389 2390
        # Linux will send Control-C generated SIGINT to the TDengine process
        # already, ref:
        # https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
        if not self._tdeSubProcess:
2391
            raise RuntimeError("sub process object missing")
2392

2393
        self._status = MainExec.STATUS_STOPPING
2394 2395
        retCode = self._tdeSubProcess.stop()
        print("Attempted to stop sub process, got return code: {}".format(retCode))
2396 2397
        if (retCode==-11): # SGV
            logger.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)")
2398

2399
        if self._tdeSubProcess.isRunning():  # still running
2400 2401
            print("FAILED to stop sub process, it is still running... pid = {}".format(
                    self._tdeSubProcess.getPid()))
2402
        else:
2403 2404 2405
            self._tdeSubProcess = None  # not running any more
            self.join()  # stop the thread, change the status, etc.

2406 2407 2408
    def join(self):
        # TODO: sanity check
        if not self.isStopping():
2409 2410 2411
            raise RuntimeError(
                "Unexpected status when ending svc mgr thread: {}".format(
                    self._status))
2412

2413
        if self._thread:
2414
            self._thread.join()
2415
            self._thread = None
2416
            self._status = MainExec.STATUS_STOPPED
2417 2418 2419
            # STD ERR thread
            self._thread2.join()
            self._thread2 = None
S
Shuduo Sang 已提交
2420
        else:
2421
            print("Joining empty thread, doing nothing")
2422 2423 2424

    def _trimQueue(self, targetSize):
        if targetSize <= 0:
2425
            return  # do nothing
2426
        q = self._ipcQueue
2427
        if (q.qsize() <= targetSize):  # no need to trim
2428 2429 2430 2431
            return

        logger.debug("Triming IPC queue to target size: {}".format(targetSize))
        itemsToTrim = q.qsize() - targetSize
2432
        for i in range(0, itemsToTrim):
2433 2434 2435
            try:
                q.get_nowait()
            except Empty:
2436 2437
                break  # break out of for loop, no more trimming

2438
    TD_READY_MSG = "TDengine is initialized successfully"
S
Shuduo Sang 已提交
2439

2440 2441
    def procIpcBatch(self, trimToTarget=0, forceOutput=False):
        self._trimQueue(trimToTarget)  # trim if necessary
S
Shuduo Sang 已提交
2442 2443
        # Process all the output generated by the underlying sub process,
        # managed by IO thread
2444
        print("<", end="", flush=True)
S
Shuduo Sang 已提交
2445 2446
        while True:
            try:
2447
                line = self._ipcQueue.get_nowait()  # getting output at fast speed
2448
                self._printProgress("_o")
2449 2450 2451
            except Empty:
                # time.sleep(2.3) # wait only if there's no output
                # no more output
2452
                print(".>", end="", flush=True)
S
Shuduo Sang 已提交
2453
                return  # we are done with THIS BATCH
2454
            else:  # got line, printing out
2455 2456 2457 2458 2459 2460 2461
                if forceOutput:
                    logger.info(line)
                else:
                    logger.debug(line)
        print(">", end="", flush=True)

    _ProgressBars = ["--", "//", "||", "\\\\"]
2462

2463 2464
    def _printProgress(self, msg):  # TODO: assuming 2 chars
        print(msg, end="", flush=True)
2465 2466 2467
        pBar = self._ProgressBars[Dice.throw(4)]
        print(pBar, end="", flush=True)
        print('\b\b\b\b', end="", flush=True)
2468

2469 2470 2471
    def svcOutputReader(self, out: IO, queue):
        # Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
        # print("This is the svcOutput Reader...")
2472
        # for line in out :
2473 2474 2475
        for line in iter(out.readline, b''):
            # print("Finished reading a line: {}".format(line))
            # print("Adding item to queue...")
2476 2477 2478 2479 2480
            try:
                line = line.decode("utf-8").rstrip()
            except UnicodeError:
                print("\nNon-UTF8 server output: {}\n".format(line))

2481 2482
            # This might block, and then causing "out" buffer to block
            queue.put(line)
2483 2484
            self._printProgress("_i")

2485 2486
            if self._status == MainExec.STATUS_STARTING:  # we are starting, let's see if we have started
                if line.find(self.TD_READY_MSG) != -1:  # found
S
Steven Li 已提交
2487
                    logger.info("Waiting for the service to become FULLY READY")
2488
                    time.sleep(1.0) # wait for the server to truly start. TODO: remove this
S
Steven Li 已提交
2489 2490
                    logger.info("Service is now FULLY READY")   
                    self._status = MainExec.STATUS_RUNNING                 
2491 2492

            # Trim the queue if necessary: TODO: try this 1 out of 10 times
2493
            self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10)  # trim to 90% size
2494

2495 2496 2497
            if self.isStopping():  # TODO: use thread status instead
                # WAITING for stopping sub process to finish its outptu
                print("_w", end="", flush=True)
2498 2499

            # queue.put(line)
2500 2501
        # meaning sub process must have died
        print("\nNo more output from IO thread managing TDengine service")
2502 2503
        out.close()

2504 2505
    def svcErrorReader(self, err: IO, queue):
        for line in iter(err.readline, b''):
2506
            print("\nTDengine Service (taosd) ERROR (from stderr): {}".format(line))
2507

2508 2509

class TdeSubProcess:
2510 2511 2512 2513 2514
    def __init__(self):
        self.subProcess = None

    def getStdOut(self):
        return self.subProcess.stdout
2515

2516 2517 2518
    def getStdErr(self):
        return self.subProcess.stderr

2519
    def isRunning(self):
2520
        return self.subProcess is not None
2521

2522 2523 2524
    def getPid(self):
        return self.subProcess.pid

S
Shuduo Sang 已提交
2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538
    def getBuildPath(self):
        selfPath = os.path.dirname(os.path.realpath(__file__))
        if ("community" in selfPath):
            projPath = selfPath[:selfPath.find("communit")]
        else:
            projPath = selfPath[:selfPath.find("tests")]

        for root, dirs, files in os.walk(projPath):
            if ("taosd" in files):
                rootRealPath = os.path.dirname(os.path.realpath(root))
                if ("packaging" not in rootRealPath):
                    buildPath = root[:len(root) - len("/build/bin")]
                    break
        return buildPath
2539

2540
    def start(self):
2541
        ON_POSIX = 'posix' in sys.builtin_module_names
S
Shuduo Sang 已提交
2542

2543 2544 2545
        taosdPath = self.getBuildPath() + "/build/bin/taosd"
        cfgPath = self.getBuildPath() + "/test/cfg"

2546 2547 2548
        # Delete the log files
        logPath = self.getBuildPath() + "/test/log"
        # ref: https://stackoverflow.com/questions/1995373/deleting-all-files-in-a-directory-with-python/1995397
2549 2550 2551 2552
        # filelist = [ f for f in os.listdir(logPath) ] # if f.endswith(".bak") ]
        # for f in filelist:
        #     filePath = os.path.join(logPath, f)
        #     print("Removing log file: {}".format(filePath))
2553 2554 2555 2556 2557 2558
        #     os.remove(filePath)        
        if os.path.exists(logPath):
            logPathSaved = logPath + "_" + time.strftime('%Y-%m-%d-%H-%M-%S')
            logger.info("Saving old log files to: {}".format(logPathSaved))
            os.rename(logPath, logPathSaved)
        # os.mkdir(logPath) # recreate, no need actually, TDengine will auto-create with proper perms
2559
            
S
Shuduo Sang 已提交
2560
        svcCmd = [taosdPath, '-c', cfgPath]
2561
        # svcCmdSingle = "{} -c {}".format(taosdPath, cfgPath)
2562
        # svcCmd = ['vmstat', '1']
S
Shuduo Sang 已提交
2563
        if self.subProcess:  # already there
2564 2565
            raise RuntimeError("Corrupt process state")

S
Steven Li 已提交
2566
        # print("Starting service: {}".format(svcCmd))
2567
        self.subProcess = subprocess.Popen(
2568 2569
            svcCmd, shell=False,
            # svcCmdSingle, shell=True, # capture core dump?
S
Shuduo Sang 已提交
2570
            stdout=subprocess.PIPE,
2571
            stderr=subprocess.PIPE,
2572
            # bufsize=1, # not supported in binary mode
S
Steven Li 已提交
2573 2574
            close_fds=ON_POSIX
            )  # had text=True, which interferred with reading EOF
2575

2576 2577 2578
    def stop(self):
        if not self.subProcess:
            print("Sub process already stopped")
2579
            return -1
2580

2581
        retCode = self.subProcess.poll() # contains real sub process return code
S
Shuduo Sang 已提交
2582
        if retCode:  # valid return code, process ended
2583
            self.subProcess = None
S
Shuduo Sang 已提交
2584 2585
        else:  # process still alive, let's interrupt it
            print(
2586
                "Sub process is running, sending SIG_INT and waiting for it to terminate...")
S
Shuduo Sang 已提交
2587 2588 2589 2590
            # sub process should end, then IPC queue should end, causing IO
            # thread to end
            self.subProcess.send_signal(signal.SIGINT)
            try:
2591
                self.subProcess.wait(10)
2592
                retCode = self.subProcess.returncode
2593 2594
            except subprocess.TimeoutExpired as err:
                print("Time out waiting for TDengine service process to exit")
2595
                retCode = -3
2596
            else:
2597
                print("TDengine service process terminated successfully from SIG_INT")
2598
                retCode = -4
2599
                self.subProcess = None
2600
        return retCode
2601

2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629
class ThreadStacks: # stack info for all threads
    def __init__(self):
        self._allStacks = {}
        allFrames = sys._current_frames()
        for th in threading.enumerate():                        
            stack = traceback.extract_stack(allFrames[th.ident])     
            self._allStacks[th.native_id] = stack

    def print(self, filteredEndName = None, filterInternal = False):
        for thNid, stack in self._allStacks.items(): # for each thread            
            lastFrame = stack[-1]
            if filteredEndName: # we need to filter out stacks that match this name                
                if lastFrame.name == filteredEndName : # end did not match
                    continue
            if filterInternal:
                if lastFrame.name in ['wait', 'invoke_excepthook', 
                    '_wait', # The Barrier exception
                    'svcOutputReader', # the svcMgr thread
                    '__init__']: # the thread that extracted the stack
                    continue # ignore
            # Now print
            print("\n<----- Thread Info for ID: {}".format(thNid))
            for frame in stack:
                # print(frame)
                print("File {filename}, line {lineno}, in {name}".format(
                    filename=frame.filename, lineno=frame.lineno, name=frame.name))
                print("    {}".format(frame.line))
            print("-----> End of Thread Info\n")
S
Shuduo Sang 已提交
2630

2631 2632 2633
class ClientManager:
    def __init__(self):
        print("Starting service manager")
2634 2635
        # signal.signal(signal.SIGTERM, self.sigIntHandler)
        # signal.signal(signal.SIGINT, self.sigIntHandler)
2636

2637
        self._status = MainExec.STATUS_RUNNING
2638 2639
        self.tc = None

2640 2641
        self.inSigHandler = False

2642
    def sigIntHandler(self, signalNumber, frame):
2643
        if self._status != MainExec.STATUS_RUNNING:
2644 2645 2646
            print("Repeated SIGINT received, forced exit...")
            # return  # do nothing if it's already not running
            sys.exit(-1)
2647
        self._status = MainExec.STATUS_STOPPING  # immediately set our status
2648

2649
        print("ClientManager: Terminating program...")
2650 2651
        self.tc.requestToStop()

2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692
    def _doMenu(self):
        choice = ""
        while True:
            print("\nInterrupting Client Program, Choose an Action: ")
            print("1: Resume")
            print("2: Terminate")
            print("3: Show Threads")
            # Remember to update the if range below
            # print("Enter Choice: ", end="", flush=True)
            while choice == "":
                choice = input("Enter Choice: ")
                if choice != "":
                    break  # done with reading repeated input
            if choice in ["1", "2", "3"]:
                break  # we are done with whole method
            print("Invalid choice, please try again.")
            choice = ""  # reset
        return choice

    def sigUsrHandler(self, signalNumber, frame):
        print("Interrupting main thread execution upon SIGUSR1")
        if self.inSigHandler:  # already
            print("Ignoring repeated SIG_USR1...")
            return  # do nothing if it's already not running
        self.inSigHandler = True

        choice = self._doMenu()
        if choice == "1":
            print("Resuming execution...")
            time.sleep(1.0)
        elif choice == "2":
            print("Not implemented yet")
            time.sleep(1.0)
        elif choice == "3":
            ts = ThreadStacks()
            ts.print()
        else:
            raise RuntimeError("Invalid menu choice: {}".format(choice))

        self.inSigHandler = False

S
Shuduo Sang 已提交
2693
    def _printLastNumbers(self):  # to verify data durability
2694 2695
        dbManager = DbManager(resetDb=False)
        dbc = dbManager.getDbConn()
2696
        if dbc.query("show databases") <= 1:  # no database (we have a default called "log")
2697
            return
2698
        dbc.execute("use db")
S
Shuduo Sang 已提交
2699
        if dbc.query("show tables") == 0:  # no tables
S
Steven Li 已提交
2700
            return
2701

S
Shuduo Sang 已提交
2702
        sTbName = dbManager.getFixedSuperTableName()
2703 2704

        # get all regular tables
S
Shuduo Sang 已提交
2705 2706
        # TODO: analyze result set later
        dbc.query("select TBNAME from db.{}".format(sTbName))
2707 2708 2709
        rTables = dbc.getQueryResult()

        bList = TaskExecutor.BoundedList()
S
Shuduo Sang 已提交
2710
        for rTbName in rTables:  # regular tables
2711 2712
            dbc.query("select speed from db.{}".format(rTbName[0]))
            numbers = dbc.getQueryResult()
S
Shuduo Sang 已提交
2713
            for row in numbers:
2714 2715 2716 2717 2718 2719
                # print("<{}>".format(n), end="", flush=True)
                bList.add(row[0])

        print("Top numbers in DB right now: {}".format(bList))
        print("TDengine client execution is about to start in 2 seconds...")
        time.sleep(2.0)
S
Shuduo Sang 已提交
2720
        dbManager = None  # release?
2721 2722 2723 2724

    def prepare(self):
        self._printLastNumbers()

2725
    def run(self, svcMgr):    
2726 2727
        self._printLastNumbers()

S
Shuduo Sang 已提交
2728
        dbManager = DbManager()  # Regular function
2729
        thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
2730
        self.tc = ThreadCoordinator(thPool, dbManager)
2731
        
2732
        self.tc.run()
S
Steven Li 已提交
2733 2734
        # print("exec stats: {}".format(self.tc.getExecStats()))
        # print("TC failed = {}".format(self.tc.isFailed()))
2735
        if svcMgr: # gConfig.auto_start_service:
2736
            svcMgr.stopTaosService()
2737 2738
        # Print exec status, etc., AFTER showing messages from the server
        self.conclude()
S
Steven Li 已提交
2739
        # print("TC failed (2) = {}".format(self.tc.isFailed()))
S
Shuduo Sang 已提交
2740 2741
        # Linux return code: ref https://shapeshed.com/unix-exit-codes/
        return 1 if self.tc.isFailed() else 0
2742 2743 2744

    def conclude(self):
        self.tc.printStats()
S
Shuduo Sang 已提交
2745
        self.tc.getDbManager().cleanUp()
2746 2747

class MainExec:
2748 2749 2750
    STATUS_STARTING = 1
    STATUS_RUNNING = 2
    STATUS_STOPPING = 3
2751
    STATUS_STOPPED = 4
2752

2753 2754 2755
    def __init__(self):        
        self._clientMgr = None
        self._svcMgr = None
2756

2757 2758 2759
        signal.signal(signal.SIGTERM, self.sigIntHandler)
        signal.signal(signal.SIGINT,  self.sigIntHandler)
        signal.signal(signal.SIGUSR1, self.sigUsrHandler)  # different handler!
2760

2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773
    def sigUsrHandler(self, signalNumber, frame):
        if self._clientMgr:
            self._clientMgr.sigUsrHandler(signalNumber, frame)
        elif self._svcMgr: # Only if no client mgr, we are running alone
            self._svcMgr.sigUsrHandler(signalNumber, frame)
        
    def sigIntHandler(self, signalNumber, frame):
        if self._svcMgr:
            self._svcMgr.sigIntHandler(signalNumber, frame)
        if self._clientMgr:
            self._clientMgr.sigIntHandler(signalNumber, frame)

    def runClient(self):
2774
        global gSvcMgr
2775 2776
        if gConfig.auto_start_service:
            self._svcMgr = SvcManager()
2777
            gSvcMgr = self._svcMgr # hack alert
2778 2779 2780
            self._svcMgr.startTaosService() # we start, don't run
        
        self._clientMgr = ClientManager()
2781 2782 2783 2784
        ret = None
        try: 
            ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside
        except requests.exceptions.ConnectionError as err:
S
Steven Li 已提交
2785
            logger.warning("Failed to open REST connection to DB: {}".format(err.getMessage()))
2786 2787
            # don't raise
        return ret
2788 2789

    def runService(self):
2790
        global gSvcMgr
2791
        self._svcMgr = SvcManager()
2792 2793
        gSvcMgr = self._svcMgr # save it in a global variable TODO: hack alert

2794
        self._svcMgr.run() # run to some end state
2795 2796
        self._svcMgr = None 
        gSvcMgr = None        
2797 2798

    def runTemp(self):  # for debugging purposes
2799 2800
        # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix
        # dbc = dbState.getDbConn()
S
Shuduo Sang 已提交
2801
        # sTbName = dbState.getFixedSuperTableName()
2802 2803
        # dbc.execute("create database if not exists db")
        # if not dbState.getState().equals(StateEmpty()):
S
Shuduo Sang 已提交
2804
        #     dbc.execute("use db")
2805 2806 2807 2808 2809 2810 2811 2812 2813 2814 2815

        # rTables = None
        # try: # the super table may not exist
        #     sql = "select TBNAME from db.{}".format(sTbName)
        #     logger.info("Finding out tables in super table: {}".format(sql))
        #     dbc.query(sql) # TODO: analyze result set later
        #     logger.info("Fetching result")
        #     rTables = dbc.getQueryResult()
        #     logger.info("Result: {}".format(rTables))
        # except taos.error.ProgrammingError as err:
        #     logger.info("Initial Super table OPS error: {}".format(err))
S
Shuduo Sang 已提交
2816

2817 2818 2819 2820 2821 2822 2823 2824
        # # sys.exit()
        # if ( not rTables == None):
        #     # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
        #     try:
        #         for rTbName in rTables : # regular tables
        #             ds = dbState
        #             logger.info("Inserting into table: {}".format(rTbName[0]))
        #             sql = "insert into db.{} values ('{}', {});".format(
S
Shuduo Sang 已提交
2825
        #                 rTbName[0],
2826 2827
        #                 ds.getNextTick(), ds.getNextInt())
        #             dbc.execute(sql)
S
Shuduo Sang 已提交
2828
        #         for rTbName in rTables : # regular tables
2829
        #             dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure
S
Shuduo Sang 已提交
2830
        #         logger.info("Initial READING operation is successful")
2831
        #     except taos.error.ProgrammingError as err:
S
Shuduo Sang 已提交
2832 2833
        #         logger.info("Initial WRITE/READ error: {}".format(err))

2834 2835 2836
        # Sandbox testing code
        # dbc = dbState.getDbConn()
        # while True:
S
Shuduo Sang 已提交
2837
        #     rows = dbc.query("show databases")
2838
        #     print("Rows: {}, time={}".format(rows, time.time()))
S
Shuduo Sang 已提交
2839 2840
        return

S
Steven Li 已提交
2841

2842
def main():
S
Shuduo Sang 已提交
2843 2844
    # Super cool Python argument library:
    # https://docs.python.org/3/library/argparse.html
2845 2846 2847 2848 2849 2850 2851 2852 2853
    parser = argparse.ArgumentParser(
        formatter_class=argparse.RawDescriptionHelpFormatter,
        description=textwrap.dedent('''\
            TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
            ---------------------------------------------------------------------
            1. You build TDengine in the top level ./build directory, as described in offical docs
            2. You run the server there before this script: ./build/bin/taosd -c test/cfg

            '''))
2854

2855 2856 2857 2858 2859 2860 2861 2862 2863 2864 2865 2866 2867 2868 2869 2870 2871 2872 2873 2874 2875
    # parser.add_argument('-a', '--auto-start-service', action='store_true',                        
    #                     help='Automatically start/stop the TDengine service (default: false)')
    # parser.add_argument('-c', '--connector-type', action='store', default='native', type=str,
    #                     help='Connector type to use: native, rest, or mixed (default: 10)')
    # parser.add_argument('-d', '--debug', action='store_true',                        
    #                     help='Turn on DEBUG mode for more logging (default: false)')
    # parser.add_argument('-e', '--run-tdengine', action='store_true',                        
    #                     help='Run TDengine service in foreground (default: false)')
    # parser.add_argument('-l', '--larger-data', action='store_true',                        
    #                     help='Write larger amount of data during write operations (default: false)')
    # parser.add_argument('-p', '--per-thread-db-connection', action='store_true',                        
    #                     help='Use a single shared db connection (default: false)')
    # parser.add_argument('-r', '--record-ops', action='store_true',                        
    #                     help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')                    
    # parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int,
    #                     help='Maximum number of steps to run (default: 100)')
    # parser.add_argument('-t', '--num-threads', action='store', default=5, type=int,
    #                     help='Number of threads to run (default: 10)')
    # parser.add_argument('-x', '--continue-on-exception', action='store_true',                        
    #                     help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')                        

S
Shuduo Sang 已提交
2876 2877 2878 2879 2880 2881 2882 2883 2884 2885 2886 2887 2888 2889 2890 2891 2892 2893 2894 2895 2896 2897 2898 2899 2900 2901 2902 2903 2904 2905 2906 2907 2908 2909 2910 2911 2912 2913 2914 2915 2916 2917 2918 2919 2920 2921 2922 2923 2924 2925 2926
    parser.add_argument(
        '-a',
        '--auto-start-service',
        action='store_true',
        help='Automatically start/stop the TDengine service (default: false)')
    parser.add_argument(
        '-c',
        '--connector-type',
        action='store',
        default='native',
        type=str,
        help='Connector type to use: native, rest, or mixed (default: 10)')
    parser.add_argument(
        '-d',
        '--debug',
        action='store_true',
        help='Turn on DEBUG mode for more logging (default: false)')
    parser.add_argument(
        '-e',
        '--run-tdengine',
        action='store_true',
        help='Run TDengine service in foreground (default: false)')
    parser.add_argument(
        '-l',
        '--larger-data',
        action='store_true',
        help='Write larger amount of data during write operations (default: false)')
    parser.add_argument(
        '-p',
        '--per-thread-db-connection',
        action='store_true',
        help='Use a single shared db connection (default: false)')
    parser.add_argument(
        '-r',
        '--record-ops',
        action='store_true',
        help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')
    parser.add_argument(
        '-s',
        '--max-steps',
        action='store',
        default=1000,
        type=int,
        help='Maximum number of steps to run (default: 100)')
    parser.add_argument(
        '-t',
        '--num-threads',
        action='store',
        default=5,
        type=int,
        help='Number of threads to run (default: 10)')
2927 2928 2929 2930 2931
    parser.add_argument(
        '-x',
        '--continue-on-exception',
        action='store_true',
        help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')
2932

2933
    global gConfig
2934
    gConfig = parser.parse_args()
2935

2936
    # Logging Stuff
2937
    global logger
S
Shuduo Sang 已提交
2938 2939
    _logger = logging.getLogger('CrashGen')  # real logger
    _logger.addFilter(LoggingFilter())
2940 2941 2942
    ch = logging.StreamHandler()
    _logger.addHandler(ch)

S
Shuduo Sang 已提交
2943 2944
    # Logging adapter, to be used as a logger
    logger = MyLoggingAdapter(_logger, [])
2945

S
Shuduo Sang 已提交
2946 2947
    if (gConfig.debug):
        logger.setLevel(logging.DEBUG)  # default seems to be INFO
S
Steven Li 已提交
2948 2949
    else:
        logger.setLevel(logging.INFO)
S
Shuduo Sang 已提交
2950

2951 2952
    Dice.seed(0)  # initial seeding of dice

2953
    # Run server or client
2954
    mExec = MainExec()
S
Shuduo Sang 已提交
2955
    if gConfig.run_tdengine:  # run server
2956
        mExec.runService()
S
Shuduo Sang 已提交
2957
    else:
2958
        return mExec.runClient()
2959

S
Shuduo Sang 已提交
2960

2961
if __name__ == "__main__":
S
Steven Li 已提交
2962 2963 2964
    exitCode = main()
    # print("Exiting with code: {}".format(exitCode))
    sys.exit(exitCode)