crash_gen.py 124.1 KB
Newer Older
S
Shuduo Sang 已提交
1
# -----!/usr/bin/python3.7
S
Steven Li 已提交
2 3 4 5 6 7 8 9 10 11 12 13
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-
S
Shuduo Sang 已提交
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
# For type hinting before definition, ref:
# https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel
from __future__ import annotations
import taos
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.log import *
from queue import Queue, Empty
from typing import IO
from typing import Set
from typing import Dict
from typing import List
from requests.auth import HTTPBasicAuth
import textwrap
import datetime
import logging
import time
import random
import threading
import requests
import copy
import argparse
import getopt
38

S
Steven Li 已提交
39
import sys
40
import os
41 42
import io
import signal
43
import traceback
44 45 46
import resource
from guppy import hpy
import gc
47
import subprocess
48 49 50 51 52 53 54

try:
    import psutil
except:
    print("Psutil module needed, please install: sudo pip3 install psutil")
    sys.exit(-1)

55 56 57 58
# Require Python 3
if sys.version_info[0] < 3:
    raise Exception("Must be using Python 3")

S
Shuduo Sang 已提交
59
# Global variables, tried to keep a small number.
60 61 62

# Command-line/Environment Configurations, will set a bit later
# ConfigNameSpace = argparse.Namespace
63 64 65 66
gConfig:    argparse.Namespace 
gSvcMgr:    ServiceManager # TODO: refactor this hack, use dep injection
logger:     logging.Logger
gContainer: Container
S
Steven Li 已提交
67

68 69
# def runThread(wt: WorkerThread):
#     wt.run()
70

71 72
class CrashGenError(Exception):
    def __init__(self, msg=None, errno=None):
S
Shuduo Sang 已提交
73
        self.msg = msg
74
        self.errno = errno
S
Shuduo Sang 已提交
75

76 77 78
    def __str__(self):
        return self.msg

S
Steven Li 已提交
79
class WorkerThread:
80
    def __init__(self, pool: ThreadPool, tid, tc: ThreadCoordinator,
S
Shuduo Sang 已提交
81 82 83
                 # te: TaskExecutor,
                 ):  # note: main thread context!
        # self._curStep = -1
84
        self._pool = pool
S
Shuduo Sang 已提交
85 86
        self._tid = tid
        self._tc = tc  # type: ThreadCoordinator
S
Steven Li 已提交
87
        # self.threadIdent = threading.get_ident()
88 89
        # self._thread = threading.Thread(target=runThread, args=(self,))
        self._thread = threading.Thread(target=self.run)
90
        self._stepGate = threading.Event()
S
Steven Li 已提交
91

92
        # Let us have a DB connection of our own
S
Shuduo Sang 已提交
93
        if (gConfig.per_thread_db_connection):  # type: ignore
94
            # print("connector_type = {}".format(gConfig.connector_type))
95 96 97 98 99 100 101 102 103 104 105
            if gConfig.connector_type == 'native':
                self._dbConn = DbConn.createNative() 
            elif gConfig.connector_type == 'rest':
                self._dbConn = DbConn.createRest() 
            elif gConfig.connector_type == 'mixed':
                if Dice.throw(2) == 0: # 1/2 chance
                    self._dbConn = DbConn.createNative() 
                else:
                    self._dbConn = DbConn.createRest() 
            else:
                raise RuntimeError("Unexpected connector type: {}".format(gConfig.connector_type))
106

107
        # self._dbInUse = False  # if "use db" was executed already
108

109
    def logDebug(self, msg):
S
Steven Li 已提交
110
        logger.debug("    TRD[{}] {}".format(self._tid, msg))
111 112

    def logInfo(self, msg):
S
Steven Li 已提交
113
        logger.info("    TRD[{}] {}".format(self._tid, msg))
114

115 116
    # def dbInUse(self):
    #     return self._dbInUse
117

118 119 120 121
    # def useDb(self):
    #     if (not self._dbInUse):
    #         self.execSql("use db")
    #     self._dbInUse = True
122

123
    def getTaskExecutor(self):
S
Shuduo Sang 已提交
124
        return self._tc.getTaskExecutor()
125

S
Steven Li 已提交
126
    def start(self):
127
        self._thread.start()  # AFTER the thread is recorded
S
Steven Li 已提交
128

S
Shuduo Sang 已提交
129
    def run(self):
S
Steven Li 已提交
130
        # initialization after thread starts, in the thread context
131
        # self.isSleeping = False
132 133
        logger.info("Starting to run thread: {}".format(self._tid))

S
Shuduo Sang 已提交
134
        if (gConfig.per_thread_db_connection):  # type: ignore
135
            logger.debug("Worker thread openning database connection")
136
            self._dbConn.open()
S
Steven Li 已提交
137

S
Shuduo Sang 已提交
138 139
        self._doTaskLoop()

140
        # clean up
S
Shuduo Sang 已提交
141
        if (gConfig.per_thread_db_connection):  # type: ignore
142 143 144 145
            if self._dbConn.isOpen: #sometimes it is not open
                self._dbConn.close()
            else:
                logger.warning("Cleaning up worker thread, dbConn already closed")
146

S
Shuduo Sang 已提交
147
    def _doTaskLoop(self):
148 149
        # while self._curStep < self._pool.maxSteps:
        # tc = ThreadCoordinator(None)
S
Shuduo Sang 已提交
150 151
        while True:
            tc = self._tc  # Thread Coordinator, the overall master
152 153 154
            try:
                tc.crossStepBarrier()  # shared barrier first, INCLUDING the last one
            except threading.BrokenBarrierError as err: # main thread timed out
155
                print("_bto", end="")
156 157 158
                logger.debug("[TRD] Worker thread exiting due to main thread barrier time-out")
                break

159
            logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid))
160
            self.crossStepGate()   # then per-thread gate, after being tapped
161
            logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
162
            if not self._tc.isRunning():
163
                print("_wts", end="")
164
                logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
165 166
                break

167
            # Before we fetch the task and run it, let's ensure we properly "use" the database (not needed any more)
168
            try:
169 170 171
                if (gConfig.per_thread_db_connection):  # most likely TRUE
                    if not self._dbConn.isOpen:  # might have been closed during server auto-restart
                        self._dbConn.open()
172
                # self.useDb() # might encounter exceptions. TODO: catch
173 174
            except taos.error.ProgrammingError as err:
                errno = Helper.convertErrno(err.errno)
175
                if errno in [0x383, 0x386, 0x00B, 0x014]  : # invalid database, dropping, Unable to establish connection, Database not ready
176 177 178 179 180 181
                    # ignore
                    dummy = 0
                else:
                    print("\nCaught programming error. errno=0x{:X}, msg={} ".format(errno, err.msg))
                    raise

182
            # Fetch a task from the Thread Coordinator
183
            logger.debug( "[TRD] Worker thread [{}] about to fetch task".format(self._tid))
184
            task = tc.fetchTask()
185 186

            # Execute such a task
187
            logger.debug("[TRD] Worker thread [{}] about to execute task: {}".format(
S
Shuduo Sang 已提交
188
                    self._tid, task.__class__.__name__))
189
            task.execute(self)
190
            tc.saveExecutedTask(task)
191
            logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
S
Shuduo Sang 已提交
192

193
            # self._dbInUse = False  # there may be changes between steps
194
        # print("_wtd", end=None) # worker thread died
195

S
Shuduo Sang 已提交
196 197
    def verifyThreadSelf(self):  # ensure we are called by this own thread
        if (threading.get_ident() != self._thread.ident):
S
Steven Li 已提交
198 199
            raise RuntimeError("Unexpectly called from other threads")

S
Shuduo Sang 已提交
200 201
    def verifyThreadMain(self):  # ensure we are called by the main thread
        if (threading.get_ident() != threading.main_thread().ident):
S
Steven Li 已提交
202 203 204
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadAlive(self):
S
Shuduo Sang 已提交
205
        if (not self._thread.is_alive()):
S
Steven Li 已提交
206 207
            raise RuntimeError("Unexpected dead thread")

208
    # A gate is different from a barrier in that a thread needs to be "tapped"
S
Steven Li 已提交
209 210
    def crossStepGate(self):
        self.verifyThreadAlive()
S
Shuduo Sang 已提交
211 212
        self.verifyThreadSelf()  # only allowed by ourselves

213
        # Wait again at the "gate", waiting to be "tapped"
S
Shuduo Sang 已提交
214 215 216 217
        logger.debug(
            "[TRD] Worker thread {} about to cross the step gate".format(
                self._tid))
        self._stepGate.wait()
218
        self._stepGate.clear()
S
Shuduo Sang 已提交
219

220
        # self._curStep += 1  # off to a new step...
S
Steven Li 已提交
221

S
Shuduo Sang 已提交
222
    def tapStepGate(self):  # give it a tap, release the thread waiting there
223
        # self.verifyThreadAlive()
S
Shuduo Sang 已提交
224 225
        self.verifyThreadMain()  # only allowed for main thread

226 227 228 229 230 231
        if self._thread.is_alive():
            logger.debug("[TRD] Tapping worker thread {}".format(self._tid))
            self._stepGate.set()  # wake up!
            time.sleep(0)  # let the released thread run a bit
        else:
            print("_tad", end="") # Thread already dead
232

S
Shuduo Sang 已提交
233
    def execSql(self, sql):  # TODO: expose DbConn directly
234
        return self.getDbConn().execute(sql)
235

S
Shuduo Sang 已提交
236
    def querySql(self, sql):  # TODO: expose DbConn directly
237
        return self.getDbConn().query(sql)
238 239

    def getQueryResult(self):
240
        return self.getDbConn().getQueryResult()
241

242
    def getDbConn(self) -> DbConn :
S
Shuduo Sang 已提交
243 244
        if (gConfig.per_thread_db_connection):
            return self._dbConn
245
        else:
246
            return self._tc.getDbManager().getDbConn()
247

248 249
    # def querySql(self, sql): # not "execute", since we are out side the DB context
    #     if ( gConfig.per_thread_db_connection ):
S
Shuduo Sang 已提交
250
    #         return self._dbConn.query(sql)
251 252
    #     else:
    #         return self._tc.getDbState().getDbConn().query(sql)
253

254
# The coordinator of all worker threads, mostly running in main thread
S
Shuduo Sang 已提交
255 256


257
class ThreadCoordinator:
258
    WORKER_THREAD_TIMEOUT = 180 # one minute
259

260
    def __init__(self, pool: ThreadPool, dbManager: DbManager):
S
Shuduo Sang 已提交
261
        self._curStep = -1  # first step is 0
262
        self._pool = pool
263
        # self._wd = wd
S
Shuduo Sang 已提交
264
        self._te = None  # prepare for every new step
265
        self._dbManager = dbManager
S
Shuduo Sang 已提交
266 267
        self._executedTasks: List[Task] = []  # in a given step
        self._lock = threading.RLock()  # sync access for a few things
S
Steven Li 已提交
268

S
Shuduo Sang 已提交
269 270
        self._stepBarrier = threading.Barrier(
            self._pool.numThreads + 1)  # one barrier for all threads
271
        self._execStats = ExecutionStats()
272
        self._runStatus = MainExec.STATUS_RUNNING
273
        self._initDbs()
S
Steven Li 已提交
274

275 276 277
    def getTaskExecutor(self):
        return self._te

S
Shuduo Sang 已提交
278
    def getDbManager(self) -> DbManager:
279
        return self._dbManager
280

281 282
    def crossStepBarrier(self, timeout=None):
        self._stepBarrier.wait(timeout) 
283

284 285 286 287
    def requestToStop(self):
        self._runStatus = MainExec.STATUS_STOPPING
        self._execStats.registerFailure("User Interruption")

288
    def _runShouldEnd(self, transitionFailed, hasAbortedTask, workerTimeout):
289 290 291 292 293 294 295 296 297
        maxSteps = gConfig.max_steps  # type: ignore
        if self._curStep >= (maxSteps - 1): # maxStep==10, last curStep should be 9
            return True
        if self._runStatus != MainExec.STATUS_RUNNING:
            return True
        if transitionFailed:
            return True
        if hasAbortedTask:
            return True
298 299
        if workerTimeout:
            return True
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
        return False

    def _hasAbortedTask(self): # from execution of previous step
        for task in self._executedTasks:
            if task.isAborted():
                # print("Task aborted: {}".format(task))
                # hasAbortedTask = True
                return True
        return False

    def _releaseAllWorkerThreads(self, transitionFailed):
        self._curStep += 1  # we are about to get into next step. TODO: race condition here!
        # Now not all threads had time to go to sleep
        logger.debug(
            "--\r\n\n--> Step {} starts with main thread waking up".format(self._curStep))

        # A new TE for the new step
        self._te = None # set to empty first, to signal worker thread to stop
        if not transitionFailed:  # only if not failed
            self._te = TaskExecutor(self._curStep)

        logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(
                self._curStep))  # Now not all threads had time to go to sleep
        # Worker threads will wake up at this point, and each execute it's own task
324
        self.tapAllThreads() # release all worker thread from their "gates"
325 326 327 328 329 330

    def _syncAtBarrier(self):
         # Now main thread (that's us) is ready to enter a step
        # let other threads go past the pool barrier, but wait at the
        # thread gate
        logger.debug("[TRD] Main thread about to cross the barrier")
331
        self.crossStepBarrier(timeout=self.WORKER_THREAD_TIMEOUT)
332 333 334 335 336 337
        self._stepBarrier.reset()  # Other worker threads should now be at the "gate"
        logger.debug("[TRD] Main thread finished crossing the barrier")

    def _doTransition(self):
        transitionFailed = False
        try:
338 339 340 341 342 343 344 345 346 347
            for x in self._dbs:
                db = x # type: Database
                sm = db.getStateMachine()
                logger.debug("[STT] starting transitions for DB: {}".format(db.getName()))
                # at end of step, transiton the DB state
                tasksForDb = db.filterTasks(self._executedTasks)
                sm.transition(tasksForDb, self.getDbManager().getDbConn())
                logger.debug("[STT] transition ended for DB: {}".format(db.getName()))

            # Due to limitation (or maybe not) of the TD Python library,
348
            # we cannot share connections across threads
349 350 351 352 353 354
            # Here we are in main thread, we cannot operate the connections created in workers
            # Moving below to task loop
            # if sm.hasDatabase():
            #     for t in self._pool.threadList:
            #         logger.debug("[DB] use db for all worker threads")
            #         t.useDb()
355 356
                    # t.execSql("use db") # main thread executing "use
                    # db" on behalf of every worker thread
357

358 359 360 361 362 363 364 365 366 367 368
        except taos.error.ProgrammingError as err:
            if (err.msg == 'network unavailable'):  # broken DB connection
                logger.info("DB connection broken, execution failed")
                traceback.print_stack()
                transitionFailed = True
                self._te = None  # Not running any more
                self._execStats.registerFailure("Broken DB Connection")
                # continue # don't do that, need to tap all threads at
                # end, and maybe signal them to stop
            else:
                raise
S
Steven Li 已提交
369
        # return transitionFailed # Why did we have this??!!
370 371 372 373 374 375

        self.resetExecutedTasks()  # clear the tasks after we are done
        # Get ready for next step
        logger.debug("<-- Step {} finished, trasition failed = {}".format(self._curStep, transitionFailed))
        return transitionFailed

S
Shuduo Sang 已提交
376
    def run(self):
377
        self._pool.createAndStartThreads(self)
S
Steven Li 已提交
378 379

        # Coordinate all threads step by step
S
Shuduo Sang 已提交
380
        self._curStep = -1  # not started yet
381
        
S
Shuduo Sang 已提交
382
        self._execStats.startExec()  # start the stop watch
383 384
        transitionFailed = False
        hasAbortedTask = False
385 386
        workerTimeout = False
        while not self._runShouldEnd(transitionFailed, hasAbortedTask, workerTimeout):
387
            if not gConfig.debug: # print this only if we are not in debug mode                
S
Shuduo Sang 已提交
388
                print(".", end="", flush=True)
389 390 391 392 393 394 395 396
            # if (self._curStep % 2) == 0: # print memory usage once every 10 steps
            #     memUsage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
            #     print("[m:{}]".format(memUsage), end="", flush=True) # print memory usage
            # if (self._curStep % 10) == 3: 
            #     h = hpy()
            #     print("\n")        
            #     print(h.heap())
            
397
                        
398 399 400 401 402 403 404 405 406 407
            try:
                self._syncAtBarrier() # For now just cross the barrier
            except threading.BrokenBarrierError as err:
                logger.info("Main loop aborted, caused by worker thread time-out")
                self._execStats.registerFailure("Aborted due to worker thread timeout")
                print("\n\nWorker Thread time-out detected, important thread info:")
                ts = ThreadStacks()
                ts.print(filterInternal=True)
                workerTimeout = True
                break
408 409

            # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
S
Shuduo Sang 已提交
410 411
            # We use this period to do house keeping work, when all worker
            # threads are QUIET.
412 413 414
            hasAbortedTask = self._hasAbortedTask() # from previous step
            if hasAbortedTask: 
                logger.info("Aborted task encountered, exiting test program")
415
                self._execStats.registerFailure("Aborted Task Encountered")
416
                break # do transition only if tasks are error free
S
Shuduo Sang 已提交
417

418
            # Ending previous step
419 420 421 422
            try:
                transitionFailed = self._doTransition() # To start, we end step -1 first
            except taos.error.ProgrammingError as err:
                transitionFailed = True
423
                errno2 = Helper.convertErrno(err.errno)  # correct error scheme
S
Steven Li 已提交
424 425
                errMsg = "Transition failed: errno=0x{:X}, msg: {}".format(errno2, err)
                logger.info(errMsg)
426
                traceback.print_exc()
S
Steven Li 已提交
427
                self._execStats.registerFailure(errMsg)
428

429 430
            # Then we move on to the next step
            self._releaseAllWorkerThreads(transitionFailed)                    
431

432 433
        if hasAbortedTask or transitionFailed : # abnormal ending, workers waiting at "gate"
            logger.debug("Abnormal ending of main thraed")
434 435
        elif workerTimeout:
            logger.debug("Abnormal ending of main thread, due to worker timeout")
436 437 438
        else: # regular ending, workers waiting at "barrier"
            logger.debug("Regular ending, main thread waiting for all worker threads to stop...")
            self._syncAtBarrier()
439

440 441 442
        self._te = None  # No more executor, time to end
        logger.debug("Main thread tapping all threads one last time...")
        self.tapAllThreads()  # Let the threads run one last time
443

444
        logger.debug("\r\n\n--> Main thread ready to finish up...")
445
        logger.debug("Main thread joining all threads")
S
Shuduo Sang 已提交
446
        self._pool.joinAll()  # Get all threads to finish
447
        logger.info("\nAll worker threads finished")
448 449
        self._execStats.endExec()

450 451 452 453 454 455 456 457 458 459 460 461 462
    def cleanup(self): # free resources
        self._pool.cleanup()

        self._pool = None
        self._te = None  
        self._dbManager = None
        self._executedTasks = None
        self._lock = None
        self._stepBarrier = None
        self._execStats = None
        self._runStatus = None


463 464
    def printStats(self):
        self._execStats.printStats()
S
Steven Li 已提交
465

S
Steven Li 已提交
466 467 468 469 470 471
    def isFailed(self):
        return self._execStats.isFailed()

    def getExecStats(self):
        return self._execStats

S
Shuduo Sang 已提交
472
    def tapAllThreads(self):  # in a deterministic manner
S
Steven Li 已提交
473
        wakeSeq = []
S
Shuduo Sang 已提交
474 475
        for i in range(self._pool.numThreads):  # generate a random sequence
            if Dice.throw(2) == 1:
S
Steven Li 已提交
476 477 478
                wakeSeq.append(i)
            else:
                wakeSeq.insert(0, i)
S
Shuduo Sang 已提交
479 480 481
        logger.debug(
            "[TRD] Main thread waking up worker threads: {}".format(
                str(wakeSeq)))
482
        # TODO: set dice seed to a deterministic value
S
Steven Li 已提交
483
        for i in wakeSeq:
S
Shuduo Sang 已提交
484 485 486
            # TODO: maybe a bit too deep?!
            self._pool.threadList[i].tapStepGate()
            time.sleep(0)  # yield
S
Steven Li 已提交
487

488
    def isRunning(self):
S
Shuduo Sang 已提交
489
        return self._te is not None
490

491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507
    def _initDbs(self):
        ''' Initialize multiple databases, invoked at __ini__() time '''
        self._dbs = [] # type: List[Database]
        dbc = self.getDbManager().getDbConn()
        if gConfig.max_dbs == 0:
            self._dbs.append(Database(0, dbc))
        else:
            for i in range(gConfig.max_dbs):
                self._dbs.append(Database(i, dbc))

    def pickDatabase(self):
        idxDb = 0
        if gConfig.max_dbs != 0 :
            idxDb = Dice.throw(gConfig.max_dbs) # 0 to N-1
        db = self._dbs[idxDb] # type: Database
        return db

S
Shuduo Sang 已提交
508
    def fetchTask(self) -> Task:
509 510 511
        ''' The thread coordinator (that's us) is responsible for fetching a task
            to be executed next.
        '''
S
Shuduo Sang 已提交
512
        if (not self.isRunning()):  # no task
513
            raise RuntimeError("Cannot fetch task when not running")
514

S
Shuduo Sang 已提交
515
        # pick a task type for current state
516 517 518
        db = self.pickDatabase()
        taskType = db.getStateMachine().pickTaskType() # type: Task
        return taskType(self._execStats, db)  # create a task from it
519 520

    def resetExecutedTasks(self):
S
Shuduo Sang 已提交
521
        self._executedTasks = []  # should be under single thread
522 523 524 525

    def saveExecutedTask(self, task):
        with self._lock:
            self._executedTasks.append(task)
526 527

# We define a class to run a number of threads in locking steps.
S
Shuduo Sang 已提交
528

529 530 531 532
class Helper:
    @classmethod
    def convertErrno(cls, errno):
        return errno if (errno > 0) else 0x80000000 + errno
S
Shuduo Sang 已提交
533

534
class ThreadPool:
535
    def __init__(self, numThreads, maxSteps):
536 537 538 539
        self.numThreads = numThreads
        self.maxSteps = maxSteps
        # Internal class variables
        self.curStep = 0
S
Shuduo Sang 已提交
540 541
        self.threadList = []  # type: List[WorkerThread]

542
    # starting to run all the threads, in locking steps
543
    def createAndStartThreads(self, tc: ThreadCoordinator):
S
Shuduo Sang 已提交
544 545
        for tid in range(0, self.numThreads):  # Create the threads
            workerThread = WorkerThread(self, tid, tc)
546
            self.threadList.append(workerThread)
S
Shuduo Sang 已提交
547
            workerThread.start()  # start, but should block immediately before step 0
548 549 550 551 552 553

    def joinAll(self):
        for workerThread in self.threadList:
            logger.debug("Joining thread...")
            workerThread._thread.join()

554 555 556
    def cleanup(self):
        self.threadList = None # maybe clean up each?

557 558
# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers
# for new table names
S
Shuduo Sang 已提交
559 560


S
Steven Li 已提交
561 562
class LinearQueue():
    def __init__(self):
563
        self.firstIndex = 1  # 1st ever element
S
Steven Li 已提交
564
        self.lastIndex = 0
S
Shuduo Sang 已提交
565 566
        self._lock = threading.RLock()  # our functions may call each other
        self.inUse = set()  # the indexes that are in use right now
S
Steven Li 已提交
567

568
    def toText(self):
S
Shuduo Sang 已提交
569 570
        return "[{}..{}], in use: {}".format(
            self.firstIndex, self.lastIndex, self.inUse)
571 572

    # Push (add new element, largest) to the tail, and mark it in use
S
Shuduo Sang 已提交
573
    def push(self):
574
        with self._lock:
S
Shuduo Sang 已提交
575 576
            # if ( self.isEmpty() ):
            #     self.lastIndex = self.firstIndex
577
            #     return self.firstIndex
578 579
            # Otherwise we have something
            self.lastIndex += 1
580 581
            self.allocate(self.lastIndex)
            # self.inUse.add(self.lastIndex) # mark it in use immediately
582
            return self.lastIndex
S
Steven Li 已提交
583 584

    def pop(self):
585
        with self._lock:
S
Shuduo Sang 已提交
586 587 588 589
            if (self.isEmpty()):
                # raise RuntimeError("Cannot pop an empty queue")
                return False  # TODO: None?

590
            index = self.firstIndex
S
Shuduo Sang 已提交
591
            if (index in self.inUse):
592 593
                return False

594 595 596 597 598 599 600
            self.firstIndex += 1
            return index

    def isEmpty(self):
        return self.firstIndex > self.lastIndex

    def popIfNotEmpty(self):
601
        with self._lock:
602 603 604 605
            if (self.isEmpty()):
                return 0
            return self.pop()

S
Steven Li 已提交
606
    def allocate(self, i):
607
        with self._lock:
608
            # logger.debug("LQ allocating item {}".format(i))
S
Shuduo Sang 已提交
609 610 611
            if (i in self.inUse):
                raise RuntimeError(
                    "Cannot re-use same index in queue: {}".format(i))
612 613
            self.inUse.add(i)

S
Steven Li 已提交
614
    def release(self, i):
615
        with self._lock:
616
            # logger.debug("LQ releasing item {}".format(i))
S
Shuduo Sang 已提交
617
            self.inUse.remove(i)  # KeyError possible, TODO: why?
618 619 620 621

    def size(self):
        return self.lastIndex + 1 - self.firstIndex

S
Steven Li 已提交
622
    def pickAndAllocate(self):
S
Shuduo Sang 已提交
623
        if (self.isEmpty()):
624 625
            return None
        with self._lock:
S
Shuduo Sang 已提交
626
            cnt = 0  # counting the interations
627 628
            while True:
                cnt += 1
S
Shuduo Sang 已提交
629
                if (cnt > self.size() * 10):  # 10x iteration already
630 631
                    # raise RuntimeError("Failed to allocate LinearQueue element")
                    return None
S
Shuduo Sang 已提交
632 633
                ret = Dice.throwRange(self.firstIndex, self.lastIndex + 1)
                if (ret not in self.inUse):
634 635 636
                    self.allocate(ret)
                    return ret

S
Shuduo Sang 已提交
637

638
class DbConn:
639
    TYPE_NATIVE = "native-c"
640
    TYPE_REST =   "rest-api"
641 642 643 644 645 646 647 648 649
    TYPE_INVALID = "invalid"

    @classmethod
    def create(cls, connType):
        if connType == cls.TYPE_NATIVE:
            return DbConnNative()
        elif connType == cls.TYPE_REST:
            return DbConnRest()
        else:
S
Shuduo Sang 已提交
650 651
            raise RuntimeError(
                "Unexpected connection type: {}".format(connType))
652 653 654 655 656 657 658 659 660

    @classmethod
    def createNative(cls):
        return cls.create(cls.TYPE_NATIVE)

    @classmethod
    def createRest(cls):
        return cls.create(cls.TYPE_REST)

661 662
    def __init__(self):
        self.isOpen = False
663
        self._type = self.TYPE_INVALID
664 665 666 667
        self._lastSql = None

    def getLastSql(self):
        return self._lastSql
668 669

    def open(self):
S
Shuduo Sang 已提交
670
        if (self.isOpen):
671 672
            raise RuntimeError("Cannot re-open an existing DB connection")

673 674
        # below implemented by child classes
        self.openByType()
675

676
        logger.debug("[DB] data connection opened, type = {}".format(self._type))
677 678
        self.isOpen = True

S
Shuduo Sang 已提交
679
    def queryScalar(self, sql) -> int:
680 681
        return self._queryAny(sql)

S
Shuduo Sang 已提交
682
    def queryString(self, sql) -> str:
683 684
        return self._queryAny(sql)

S
Shuduo Sang 已提交
685 686
    def _queryAny(self, sql):  # actual query result as an int
        if (not self.isOpen):
687
            raise RuntimeError("Cannot query database until connection is open")
688
        nRows = self.query(sql)
S
Shuduo Sang 已提交
689
        if nRows != 1:
690 691 692 693
            raise taos.error.ProgrammingError(
                "Unexpected result for query: {}, rows = {}".format(sql, nRows), 
                (0x991 if nRows==0 else 0x992)
            )
694
        if self.getResultRows() != 1 or self.getResultCols() != 1:
695
            raise RuntimeError("Unexpected result set for query: {}".format(sql))
696 697
        return self.getQueryResult()[0][0]

698 699 700
    def use(self, dbName):
        self.execute("use {}".format(dbName))

701 702 703 704 705 706 707
    def existsDatabase(self, dbName: str):
        ''' Check if a certain database exists '''
        self.query("show databases")
        dbs = [v[0] for v in self.getQueryResult()] # ref: https://stackoverflow.com/questions/643823/python-list-transformation
        # ret2 = dbName in dbs
        # print("dbs = {}, str = {}, ret2={}, type2={}".format(dbs, dbName,ret2, type(dbName)))
        return dbName in dbs # TODO: super weird type mangling seen, once here
708 709 710 711

    def hasTables(self):
        return self.query("show tables") > 0

712
    def execute(self, sql):
713
        ''' Return the number of rows affected'''
714
        raise RuntimeError("Unexpected execution, should be overriden")
S
Shuduo Sang 已提交
715

716 717 718 719 720 721 722 723 724
    def safeExecute(self, sql):
        '''Safely execute any SQL query, returning True/False upon success/failure'''
        try:
            self.execute(sql)
            return True # ignore num of results, return success
        except taos.error.ProgrammingError as err:
            return False # failed, for whatever TAOS reason
        # Not possile to reach here, non-TAOS exception would have been thrown

725
    def query(self, sql) -> int: # return num rows returned
726
        ''' Return the number of rows affected'''
727 728
        raise RuntimeError("Unexpected execution, should be overriden")

729 730
    def openByType(self):
        raise RuntimeError("Unexpected execution, should be overriden")
S
Shuduo Sang 已提交
731

732 733
    def getQueryResult(self):
        raise RuntimeError("Unexpected execution, should be overriden")
S
Shuduo Sang 已提交
734

735 736
    def getResultRows(self):
        raise RuntimeError("Unexpected execution, should be overriden")
S
Shuduo Sang 已提交
737

738 739 740 741
    def getResultCols(self):
        raise RuntimeError("Unexpected execution, should be overriden")

# Sample: curl -u root:taosdata -d "show databases" localhost:6020/rest/sql
S
Shuduo Sang 已提交
742 743


744 745 746 747
class DbConnRest(DbConn):
    def __init__(self):
        super().__init__()
        self._type = self.TYPE_REST
S
Steven Li 已提交
748
        self._url = "http://localhost:6041/rest/sql"  # fixed for now
749 750
        self._result = None

S
Shuduo Sang 已提交
751 752 753
    def openByType(self):  # Open connection
        pass  # do nothing, always open

754
    def close(self):
S
Shuduo Sang 已提交
755
        if (not self.isOpen):
756
            raise RuntimeError("Cannot clean up database until connection is open")
757 758 759 760 761
        # Do nothing for REST
        logger.debug("[DB] REST Database connection closed")
        self.isOpen = False

    def _doSql(self, sql):
762
        self._lastSql = sql # remember this, last SQL attempted
763 764 765
        try:
            r = requests.post(self._url, 
                data = sql,
766
                auth = HTTPBasicAuth('root', 'taosdata'))         
767 768 769
        except:
            print("REST API Failure (TODO: more info here)")
            raise
770 771
        rj = r.json()
        # Sanity check for the "Json Result"
S
Shuduo Sang 已提交
772
        if ('status' not in rj):
773 774
            raise RuntimeError("No status in REST response")

S
Shuduo Sang 已提交
775 776 777 778
        if rj['status'] == 'error':  # clearly reported error
            if ('code' not in rj):  # error without code
                raise RuntimeError("REST error return without code")
            errno = rj['code']  # May need to massage this in the future
779
            # print("Raising programming error with REST return: {}".format(rj))
S
Shuduo Sang 已提交
780 781
            raise taos.error.ProgrammingError(
                rj['desc'], errno)  # todo: check existance of 'desc'
782

S
Shuduo Sang 已提交
783 784 785 786
        if rj['status'] != 'succ':  # better be this
            raise RuntimeError(
                "Unexpected REST return status: {}".format(
                    rj['status']))
787 788

        nRows = rj['rows'] if ('rows' in rj) else 0
S
Shuduo Sang 已提交
789
        self._result = rj
790 791
        return nRows

S
Shuduo Sang 已提交
792 793 794 795
    def execute(self, sql):
        if (not self.isOpen):
            raise RuntimeError(
                "Cannot execute database commands until connection is open")
796 797
        logger.debug("[SQL-REST] Executing SQL: {}".format(sql))
        nRows = self._doSql(sql)
S
Shuduo Sang 已提交
798 799
        logger.debug(
            "[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
800 801
        return nRows

S
Shuduo Sang 已提交
802
    def query(self, sql):  # return rows affected
803 804 805 806 807 808 809 810 811 812 813 814 815
        return self.execute(sql)

    def getQueryResult(self):
        return self._result['data']

    def getResultRows(self):
        print(self._result)
        raise RuntimeError("TBD")
        # return self._tdSql.queryRows

    def getResultCols(self):
        print(self._result)
        raise RuntimeError("TBD")
S
Shuduo Sang 已提交
816

817
    # Duplicate code from TDMySQL, TODO: merge all this into DbConnNative
818 819


820
class MyTDSql:
821 822 823 824 825 826 827
    # Class variables
    _clsLock = threading.Lock() # class wide locking
    longestQuery = None # type: str
    longestQueryTime = 0.0 # seconds
    lqStartTime = 0.0
    # lqEndTime = 0.0 # Not needed, as we have the two above already

828 829 830 831 832
    def __init__(self, hostAddr, cfgPath):
        # Make the DB connection
        self._conn = taos.connect(host=hostAddr, config=cfgPath) 
        self._cursor = self._conn.cursor()

833 834 835 836
        self.queryRows = 0
        self.queryCols = 0
        self.affectedRows = 0

837 838
    # def init(self, cursor, log=True):
    #     self.cursor = cursor
839 840 841 842 843
        # if (log):
        #     caller = inspect.getframeinfo(inspect.stack()[1][0])
        #     self.cursor.log(caller.filename + ".sql")

    def close(self):
844
        self._cursor.close() # can we double close?
845 846
        self._conn.close() # TODO: very important, cursor close does NOT close DB connection!
        self._cursor.close()
847

848 849 850
    def _execInternal(self, sql):
        startTime = time.time() 
        ret = self._cursor.execute(sql)
851
        # print("\nSQL success: {}".format(sql))
852 853 854 855 856 857 858 859 860 861
        queryTime =  time.time() - startTime
        # Record the query time
        cls = self.__class__
        if queryTime > (cls.longestQueryTime + 0.01) :
            with cls._clsLock:
                cls.longestQuery = sql
                cls.longestQueryTime = queryTime
                cls.lqStartTime = startTime
        return ret

862 863 864
    def query(self, sql):
        self.sql = sql
        try:
865
            self._execInternal(sql)
866
            self.queryResult = self._cursor.fetchall()
867
            self.queryRows = len(self.queryResult)
868
            self.queryCols = len(self._cursor.description)
869 870 871 872 873 874
        except Exception as e:
            # caller = inspect.getframeinfo(inspect.stack()[1][0])
            # args = (caller.filename, caller.lineno, sql, repr(e))
            # tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
            raise
        return self.queryRows
875

876 877 878
    def execute(self, sql):
        self.sql = sql
        try:
879
            self.affectedRows = self._execInternal(sql)
880 881 882 883 884 885 886
        except Exception as e:
            # caller = inspect.getframeinfo(inspect.stack()[1][0])
            # args = (caller.filename, caller.lineno, sql, repr(e))
            # tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
            raise
        return self.affectedRows

887 888 889 890 891 892
class TdeInstance():
    """
    A class to capture the *static* information of a TDengine instance,
    including the location of the various files/directories, and basica
    configuration.
    """
S
Shuduo Sang 已提交
893

894 895
    @classmethod
    def _getBuildPath(cls):
896 897 898 899 900 901
        selfPath = os.path.dirname(os.path.realpath(__file__))
        if ("community" in selfPath):
            projPath = selfPath[:selfPath.find("communit")]
        else:
            projPath = selfPath[:selfPath.find("tests")]

902
        buildPath = None
903 904 905 906
        for root, dirs, files in os.walk(projPath):
            if ("taosd" in files):
                rootRealPath = os.path.dirname(os.path.realpath(root))
                if ("packaging" not in rootRealPath):
S
Shuduo Sang 已提交
907
                    buildPath = root[:len(root) - len("/build/bin")]
908
                    break
909
        if buildPath == None:
910 911
            raise RuntimeError("Failed to determine buildPath, selfPath={}, projPath={}"
                .format(selfPath, projPath))
912 913
        return buildPath

914 915 916 917 918 919
    def __init__(self, subdir='test'):
        self._buildDir = self._getBuildPath()
        self._subdir = '/' + subdir # TODO: tolerate "/"

    def __repr__(self):
        return "[TdeInstance: {}, subdir={}]".format(self._buildDir, self._subdir)
920
    
921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019
    def generateCfgFile(self):
        # buildPath = self.getBuildPath()
        # taosdPath = self._buildPath + "/build/bin/taosd"

        cfgDir  = self.getCfgDir()
        cfgFile = cfgDir + "/taos.cfg" # TODO: inquire if this is fixed
        if os.path.exists(cfgFile):
            if os.path.isfile(cfgFile):
                logger.warning("Config file exists already, skip creation: {}".format(cfgFile))
                return # cfg file already exists, nothing to do
            else:
                raise CrashGenError("Invalid config file: {}".format(cfgFile))
        # Now that the cfg file doesn't exist
        if os.path.exists(cfgDir):
            if not os.path.isdir(cfgDir):
                raise CrashGenError("Invalid config dir: {}".format(cfgDir))
            # else: good path
        else: 
            os.makedirs(cfgDir, exist_ok=True) # like "mkdir -p"
        # Now we have a good cfg dir
        cfgValues = {
            'runDir': self.getRunDir(),
            'ip': '127.0.0.1', # TODO: change to a network addressable ip
            'port': 6030,
        }
        cfgTemplate = """
dataDir {runDir}/data
logDir  {runDir}/log

charset UTF-8

firstEp {ip}:{port}
fqdn {ip}
serverPort {port}

# was all 135 below
dDebugFlag 135
cDebugFlag 135
rpcDebugFlag 135
qDebugFlag 135
# httpDebugFlag 143
# asyncLog 0
# tables 10
maxtablesPerVnode 10
rpcMaxTime 101
# cache 2
keep 36500
# walLevel 2
walLevel 1
#
# maxConnections 100
"""
        cfgContent = cfgTemplate.format_map(cfgValues)
        f = open(cfgFile, "w")
        f.write(cfgContent)
        f.close()

    def rotateLogs(self):
        logPath = self.getLogDir()
        # ref: https://stackoverflow.com/questions/1995373/deleting-all-files-in-a-directory-with-python/1995397
        if os.path.exists(logPath):
            logPathSaved = logPath + "_" + time.strftime('%Y-%m-%d-%H-%M-%S')
            logger.info("Saving old log files to: {}".format(logPathSaved))
            os.rename(logPath, logPathSaved)
        # os.mkdir(logPath) # recreate, no need actually, TDengine will auto-create with proper perms


    def getExecFile(self): # .../taosd
        return self._buildDir + "/build/bin/taosd"

    def getRunDir(self): # TODO: rename to "root dir" ?!
        return self._buildDir + self._subdir

    def getCfgDir(self): # path, not file
        return self.getRunDir() + "/cfg"

    def getLogDir(self):
        return self.getRunDir() + "/log"

    def getHostAddr(self):
        return "127.0.0.1"

    def getServiceCommand(self): # to start the instance
        return [self.getExecFile(), '-c', self.getCfgDir()] # used in subproce.Popen()



class DbConnNative(DbConn):
    # Class variables
    _lock = threading.Lock()
    _connInfoDisplayed = False
    totalConnections = 0 # Not private

    def __init__(self):
        super().__init__()
        self._type = self.TYPE_NATIVE
        self._conn = None
        # self._cursor = None        

S
Shuduo Sang 已提交
1020
    def openByType(self):  # Open connection
1021 1022 1023 1024 1025
        global gContainer
        tdeInstance = gContainer.defTdeInstance # set up in ClientManager, type: TdeInstance
        # cfgPath = self.getBuildPath() + "/test/cfg"
        cfgPath  = tdeInstance.getCfgDir()
        hostAddr = tdeInstance.getHostAddr()
1026

1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037
        cls = self.__class__ # Get the class, to access class variables
        with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!!
            if not cls._connInfoDisplayed:
                cls._connInfoDisplayed = True # updating CLASS variable
                logger.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath))                    
            # Make the connection         
            # self._conn = taos.connect(host=hostAddr, config=cfgPath)  # TODO: make configurable
            # self._cursor = self._conn.cursor()
            # Record the count in the class
            self._tdSql = MyTDSql(hostAddr, cfgPath) # making DB connection
            cls.totalConnections += 1 
1038
        
1039
        self._tdSql.execute('reset query cache')
S
Shuduo Sang 已提交
1040
        # self._cursor.execute('use db') # do this at the beginning of every
1041 1042

        # Open connection
1043 1044 1045
        # self._tdSql = MyTDSql()
        # self._tdSql.init(self._cursor)
        
1046
    def close(self):
S
Shuduo Sang 已提交
1047
        if (not self.isOpen):
1048
            raise RuntimeError("Cannot clean up database until connection is open")
1049
        self._tdSql.close()
1050 1051 1052 1053 1054
        # Decrement the class wide counter
        cls = self.__class__ # Get the class, to access class variables
        with cls._lock:
            cls.totalConnections -= 1

1055
        logger.debug("[DB] Database connection closed")
1056
        self.isOpen = False
S
Steven Li 已提交
1057

S
Shuduo Sang 已提交
1058 1059
    def execute(self, sql):
        if (not self.isOpen):
1060
            raise RuntimeError("Cannot execute database commands until connection is open")
1061
        logger.debug("[SQL] Executing SQL: {}".format(sql))
1062
        self._lastSql = sql
1063
        nRows = self._tdSql.execute(sql)
S
Shuduo Sang 已提交
1064 1065 1066
        logger.debug(
            "[SQL] Execution Result, nRows = {}, SQL = {}".format(
                nRows, sql))
1067
        return nRows
S
Steven Li 已提交
1068

S
Shuduo Sang 已提交
1069 1070 1071 1072
    def query(self, sql):  # return rows affected
        if (not self.isOpen):
            raise RuntimeError(
                "Cannot query database until connection is open")
1073
        logger.debug("[SQL] Executing SQL: {}".format(sql))
1074
        self._lastSql = sql
1075
        nRows = self._tdSql.query(sql)
S
Shuduo Sang 已提交
1076 1077 1078
        logger.debug(
            "[SQL] Query Result, nRows = {}, SQL = {}".format(
                nRows, sql))
1079
        return nRows
1080
        # results are in: return self._tdSql.queryResult
1081

1082 1083 1084
    def getQueryResult(self):
        return self._tdSql.queryResult

1085 1086
    def getResultRows(self):
        return self._tdSql.queryRows
1087

1088 1089
    def getResultCols(self):
        return self._tdSql.queryCols
1090

S
Shuduo Sang 已提交
1091

1092
class AnyState:
S
Shuduo Sang 已提交
1093 1094 1095
    STATE_INVALID = -1
    STATE_EMPTY = 0  # nothing there, no even a DB
    STATE_DB_ONLY = 1  # we have a DB, but nothing else
1096
    STATE_TABLE_ONLY = 2  # we have a table, but totally empty
S
Shuduo Sang 已提交
1097
    STATE_HAS_DATA = 3  # we have some data in the table
1098 1099 1100 1101
    _stateNames = ["Invalid", "Empty", "DB_Only", "Table_Only", "Has_Data"]

    STATE_VAL_IDX = 0
    CAN_CREATE_DB = 1
1102 1103 1104
    # For below, if we can "drop the DB", but strictly speaking 
    # only "under normal circumstances", as we may override it with the -b option
    CAN_DROP_DB = 2  
1105 1106
    CAN_CREATE_FIXED_SUPER_TABLE = 3
    CAN_DROP_FIXED_SUPER_TABLE = 4
1107 1108 1109 1110 1111 1112 1113
    CAN_ADD_DATA = 5
    CAN_READ_DATA = 6

    def __init__(self):
        self._info = self.getInfo()

    def __str__(self):
S
Shuduo Sang 已提交
1114 1115
        # -1 hack to accomodate the STATE_INVALID case
        return self._stateNames[self._info[self.STATE_VAL_IDX] + 1]
1116

1117 1118
    # Each sub state tells us the "info", about itself, so we can determine
    # on things like canDropDB()
1119 1120 1121
    def getInfo(self):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
1122 1123 1124 1125 1126 1127
    def equals(self, other):
        if isinstance(other, int):
            return self.getValIndex() == other
        elif isinstance(other, AnyState):
            return self.getValIndex() == other.getValIndex()
        else:
S
Shuduo Sang 已提交
1128 1129 1130
            raise RuntimeError(
                "Unexpected comparison, type = {}".format(
                    type(other)))
S
Steven Li 已提交
1131

1132 1133 1134
    def verifyTasksToState(self, tasks, newState):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
1135 1136 1137
    def getValIndex(self):
        return self._info[self.STATE_VAL_IDX]

1138 1139
    def getValue(self):
        return self._info[self.STATE_VAL_IDX]
S
Shuduo Sang 已提交
1140

1141 1142
    def canCreateDb(self):
        return self._info[self.CAN_CREATE_DB]
S
Shuduo Sang 已提交
1143

1144
    def canDropDb(self):
1145 1146 1147 1148
        # If user requests to run up to a number of DBs,
        # we'd then not do drop_db operations any more
        if gConfig.max_dbs > 0 : 
            return False
1149
        return self._info[self.CAN_DROP_DB]
S
Shuduo Sang 已提交
1150

1151 1152
    def canCreateFixedSuperTable(self):
        return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE]
S
Shuduo Sang 已提交
1153

1154 1155
    def canDropFixedSuperTable(self):
        return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
S
Shuduo Sang 已提交
1156

1157 1158
    def canAddData(self):
        return self._info[self.CAN_ADD_DATA]
S
Shuduo Sang 已提交
1159

1160 1161 1162 1163 1164
    def canReadData(self):
        return self._info[self.CAN_READ_DATA]

    def assertAtMostOneSuccess(self, tasks, cls):
        sCnt = 0
S
Shuduo Sang 已提交
1165
        for task in tasks:
1166 1167 1168
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
S
Steven Li 已提交
1169
                # task.logDebug("Task success found")
1170
                sCnt += 1
S
Shuduo Sang 已提交
1171 1172 1173
                if (sCnt >= 2):
                    raise RuntimeError(
                        "Unexpected more than 1 success with task: {}".format(cls))
1174 1175 1176 1177

    def assertIfExistThenSuccess(self, tasks, cls):
        sCnt = 0
        exists = False
S
Shuduo Sang 已提交
1178
        for task in tasks:
1179 1180
            if not isinstance(task, cls):
                continue
S
Shuduo Sang 已提交
1181
            exists = True  # we have a valid instance
1182 1183
            if task.isSuccess():
                sCnt += 1
S
Shuduo Sang 已提交
1184
        if (exists and sCnt <= 0):
S
Steven Li 已提交
1185 1186
            raise RuntimeError("Unexpected zero success for task type: {}, from tasks: {}"
                .format(cls, tasks))
1187 1188

    def assertNoTask(self, tasks, cls):
S
Shuduo Sang 已提交
1189
        for task in tasks:
1190
            if isinstance(task, cls):
S
Shuduo Sang 已提交
1191 1192
                raise CrashGenError(
                    "This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__))
1193 1194

    def assertNoSuccess(self, tasks, cls):
S
Shuduo Sang 已提交
1195
        for task in tasks:
1196 1197
            if isinstance(task, cls):
                if task.isSuccess():
S
Shuduo Sang 已提交
1198 1199
                    raise RuntimeError(
                        "Unexpected successful task: {}".format(cls))
1200 1201

    def hasSuccess(self, tasks, cls):
S
Shuduo Sang 已提交
1202
        for task in tasks:
1203 1204 1205 1206 1207 1208
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
                return True
        return False

S
Steven Li 已提交
1209
    def hasTask(self, tasks, cls):
S
Shuduo Sang 已提交
1210
        for task in tasks:
S
Steven Li 已提交
1211 1212 1213 1214
            if isinstance(task, cls):
                return True
        return False

S
Shuduo Sang 已提交
1215

1216 1217 1218 1219
class StateInvalid(AnyState):
    def getInfo(self):
        return [
            self.STATE_INVALID,
S
Shuduo Sang 已提交
1220 1221 1222
            False, False,  # can create/drop Db
            False, False,  # can create/drop fixed table
            False, False,  # can insert/read data with fixed table
1223 1224 1225 1226
        ]

    # def verifyTasksToState(self, tasks, newState):

S
Shuduo Sang 已提交
1227

1228 1229 1230 1231
class StateEmpty(AnyState):
    def getInfo(self):
        return [
            self.STATE_EMPTY,
S
Shuduo Sang 已提交
1232 1233 1234
            True, False,  # can create/drop Db
            False, False,  # can create/drop fixed table
            False, False,  # can insert/read data with fixed table
1235 1236
        ]

S
Shuduo Sang 已提交
1237 1238
    def verifyTasksToState(self, tasks, newState):
        if (self.hasSuccess(tasks, TaskCreateDb)
1239
                ):  # at EMPTY, if there's succes in creating DB
S
Shuduo Sang 已提交
1240 1241 1242 1243
            if (not self.hasTask(tasks, TaskDropDb)):  # and no drop_db tasks
                # we must have at most one. TODO: compare numbers
                self.assertAtMostOneSuccess(tasks, TaskCreateDb)

1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254

class StateDbOnly(AnyState):
    def getInfo(self):
        return [
            self.STATE_DB_ONLY,
            False, True,
            True, False,
            False, False,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
1255 1256 1257
        if (not self.hasTask(tasks, TaskCreateDb)):
            # only if we don't create any more
            self.assertAtMostOneSuccess(tasks, TaskDropDb)
1258 1259 1260 1261 1262

        # TODO: restore the below, the problem exists, although unlikely in real-world
        # if (gSvcMgr!=None) and gSvcMgr.isRestarting():     
        # if (gSvcMgr == None) or (not gSvcMgr.isRestarting()) : 
        #     self.assertIfExistThenSuccess(tasks, TaskDropDb)       
1263

S
Shuduo Sang 已提交
1264

1265
class StateSuperTableOnly(AnyState):
1266 1267 1268 1269 1270 1271 1272 1273 1274
    def getInfo(self):
        return [
            self.STATE_TABLE_ONLY,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
1275
        if (self.hasSuccess(tasks, TaskDropSuperTable)
1276
                ):  # we are able to drop the table
1277
            #self.assertAtMostOneSuccess(tasks, TaskDropSuperTable)
S
Shuduo Sang 已提交
1278 1279
            # we must have had recreted it
            self.hasSuccess(tasks, TaskCreateSuperTable)
1280

1281
            # self._state = self.STATE_DB_ONLY
S
Steven Li 已提交
1282 1283
        # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
        #     self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases
1284
            # self._state = self.STATE_HAS_DATA
S
Steven Li 已提交
1285 1286 1287
        # elif ( self.hasSuccess(tasks, ReadFixedDataTask) ): # no success in prev cases, but was able to read data
            # self.assertNoTask(tasks, DropFixedTableTask)
            # self.assertNoTask(tasks, AddFixedDataTask)
1288
            # self._state = self.STATE_TABLE_ONLY # no change
S
Steven Li 已提交
1289 1290 1291
        # else: # did not drop table, did not insert data, did not read successfully, that is impossible
        #     raise RuntimeError("Unexpected no-success scenarios")
        # TODO: need to revamp!!
1292

S
Shuduo Sang 已提交
1293

1294 1295 1296 1297 1298 1299 1300 1301 1302 1303
class StateHasData(AnyState):
    def getInfo(self):
        return [
            self.STATE_HAS_DATA,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Shuduo Sang 已提交
1304
        if (newState.equals(AnyState.STATE_EMPTY)):
1305
            self.hasSuccess(tasks, TaskDropDb)
S
Shuduo Sang 已提交
1306 1307 1308 1309
            if (not self.hasTask(tasks, TaskCreateDb)):
                self.assertAtMostOneSuccess(tasks, TaskDropDb)  # TODO: dicy
        elif (newState.equals(AnyState.STATE_DB_ONLY)):  # in DB only
            if (not self.hasTask(tasks, TaskCreateDb)
1310
                ):  # without a create_db task
S
Shuduo Sang 已提交
1311 1312
                # we must have drop_db task
                self.assertNoTask(tasks, TaskDropDb)
1313
            self.hasSuccess(tasks, TaskDropSuperTable)
1314
            # self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy
1315 1316 1317 1318
        # elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted
            # self.assertNoTask(tasks, TaskDropDb)
            # self.assertNoTask(tasks, TaskDropSuperTable)
            # self.assertNoTask(tasks, TaskAddData)
S
Steven Li 已提交
1319
            # self.hasSuccess(tasks, DeleteDataTasks)
S
Shuduo Sang 已提交
1320 1321
        else:  # should be STATE_HAS_DATA
            if (not self.hasTask(tasks, TaskCreateDb)
1322
                ):  # only if we didn't create one
S
Shuduo Sang 已提交
1323 1324 1325
                # we shouldn't have dropped it
                self.assertNoTask(tasks, TaskDropDb)
            if (not self.hasTask(tasks, TaskCreateSuperTable)
1326
                    ):  # if we didn't create the table
S
Shuduo Sang 已提交
1327 1328
                # we should not have a task that drops it
                self.assertNoTask(tasks, TaskDropSuperTable)
1329
            # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask)
S
Steven Li 已提交
1330

S
Shuduo Sang 已提交
1331

1332
class StateMechine:
1333 1334 1335
    def __init__(self, db: Database): 
        self._db = db
        # transitition target probabilities, indexed with value of STATE_EMPTY, STATE_DB_ONLY, etc.
1336
        self._stateWeights = [1, 2, 10, 40]
S
Shuduo Sang 已提交
1337

1338 1339 1340 1341 1342
    def init(self, dbc: DbConn): # late initailization, don't save the dbConn
        self._curState = self._findCurrentState(dbc)  # starting state
        logger.debug("Found Starting State: {}".format(self._curState))

    # TODO: seems no lnoger used, remove?
1343 1344 1345
    def getCurrentState(self):
        return self._curState

1346 1347 1348
    def hasDatabase(self):
        return self._curState.canDropDb()  # ha, can drop DB means it has one

1349
    # May be slow, use cautionsly...
S
Shuduo Sang 已提交
1350
    def getTaskTypes(self):  # those that can run (directly/indirectly) from the current state
1351 1352 1353 1354 1355 1356
        def typesToStrings(types):
            ss = []
            for t in types:
                ss.append(t.__name__)
            return ss

S
Shuduo Sang 已提交
1357
        allTaskClasses = StateTransitionTask.__subclasses__()  # all state transition tasks
1358 1359
        firstTaskTypes = []
        for tc in allTaskClasses:
S
Shuduo Sang 已提交
1360
            # t = tc(self) # create task object
1361 1362
            if tc.canBeginFrom(self._curState):
                firstTaskTypes.append(tc)
S
Shuduo Sang 已提交
1363 1364 1365 1366 1367 1368 1369 1370
        # now we have all the tasks that can begin directly from the current
        # state, let's figure out the INDIRECT ones
        taskTypes = firstTaskTypes.copy()  # have to have these
        for task1 in firstTaskTypes:  # each task type gathered so far
            endState = task1.getEndState()  # figure the end state
            if endState is None:  # does not change end state
                continue  # no use, do nothing
            for tc in allTaskClasses:  # what task can further begin from there?
1371
                if tc.canBeginFrom(endState) and (tc not in firstTaskTypes):
S
Shuduo Sang 已提交
1372
                    taskTypes.append(tc)  # gather it
1373 1374

        if len(taskTypes) <= 0:
S
Shuduo Sang 已提交
1375 1376 1377 1378 1379 1380 1381
            raise RuntimeError(
                "No suitable task types found for state: {}".format(
                    self._curState))
        logger.debug(
            "[OPS] Tasks found for state {}: {}".format(
                self._curState,
                typesToStrings(taskTypes)))
1382 1383
        return taskTypes

1384
    def _findCurrentState(self, dbc: DbConn):
S
Shuduo Sang 已提交
1385
        ts = time.time()  # we use this to debug how fast/slow it is to do the various queries to find the current DB state
1386 1387
        dbName =self._db.getName()
        if not dbc.existsDatabase(dbName): # dbc.hasDatabases():  # no database?!
1388
            logger.debug( "[STT] empty database found, between {} and {}".format(ts, time.time()))
1389
            return StateEmpty()
S
Shuduo Sang 已提交
1390 1391
        # did not do this when openning connection, and this is NOT the worker
        # thread, which does this on their own
1392
        dbc.use(dbName)
1393 1394
        if not dbc.hasTables():  # no tables
            logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
1395
            return StateDbOnly()
1396

1397 1398
        sTable = self._db.getFixedSuperTable()
        if sTable.hasRegTables(dbc, dbName):  # no regular tables
1399
            logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
1400
            return StateSuperTableOnly()
S
Shuduo Sang 已提交
1401
        else:  # has actual tables
1402
            logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
1403 1404
            return StateHasData()

1405 1406
    # We transition the system to a new state by examining the current state itself
    def transition(self, tasks, dbc: DbConn):
S
Shuduo Sang 已提交
1407
        if (len(tasks) == 0):  # before 1st step, or otherwise empty
1408
            logger.debug("[STT] Starting State: {}".format(self._curState))
S
Shuduo Sang 已提交
1409
            return  # do nothing
1410

S
Shuduo Sang 已提交
1411
        # this should show up in the server log, separating steps
1412
        dbc.execute("show dnodes")
1413 1414 1415 1416

        # Generic Checks, first based on the start state
        if self._curState.canCreateDb():
            self._curState.assertIfExistThenSuccess(tasks, TaskCreateDb)
S
Shuduo Sang 已提交
1417 1418
            # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in
            # case of multiple creation and drops
1419 1420

        if self._curState.canDropDb():
1421
            if gSvcMgr == None: # only if we are running as client-only
S
Steven Li 已提交
1422
                self._curState.assertIfExistThenSuccess(tasks, TaskDropDb)
S
Shuduo Sang 已提交
1423 1424
            # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in
            # case of drop-create-drop
1425 1426 1427

        # if self._state.canCreateFixedTable():
            # self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped
S
Shuduo Sang 已提交
1428 1429
            # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not
            # really, in case of create-drop-create
1430 1431 1432

        # if self._state.canDropFixedTable():
            # self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped
S
Shuduo Sang 已提交
1433 1434
            # self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not
            # really in case of drop-create-drop
1435 1436

        # if self._state.canAddData():
S
Shuduo Sang 已提交
1437 1438
        # self.assertIfExistThenSuccess(tasks, AddFixedDataTask)  # not true
        # actually
1439 1440 1441 1442

        # if self._state.canReadData():
            # Nothing for sure

1443
        newState = self._findCurrentState(dbc)
1444
        logger.debug("[STT] New DB state determined: {}".format(newState))
S
Shuduo Sang 已提交
1445 1446
        # can old state move to new state through the tasks?
        self._curState.verifyTasksToState(tasks, newState)
1447 1448 1449
        self._curState = newState

    def pickTaskType(self):
S
Shuduo Sang 已提交
1450 1451
        # all the task types we can choose from at curent state
        taskTypes = self.getTaskTypes()
1452 1453 1454
        weights = []
        for tt in taskTypes:
            endState = tt.getEndState()
S
Shuduo Sang 已提交
1455 1456 1457
            if endState is not None:
                # TODO: change to a method
                weights.append(self._stateWeights[endState.getValIndex()])
1458
            else:
S
Shuduo Sang 已提交
1459 1460
                # read data task, default to 10: TODO: change to a constant
                weights.append(10)
1461
        i = self._weighted_choice_sub(weights)
S
Shuduo Sang 已提交
1462
        # logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))
1463 1464
        return taskTypes[i]

S
Shuduo Sang 已提交
1465 1466 1467 1468 1469
    # ref:
    # https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
    def _weighted_choice_sub(self, weights):
        # TODO: use our dice to ensure it being determinstic?
        rnd = random.random() * sum(weights)
1470 1471 1472 1473
        for i, w in enumerate(weights):
            rnd -= w
            if rnd < 0:
                return i
1474

1475 1476 1477 1478 1479 1480
class Database:
    ''' We use this to represent an actual TDengine database inside a service instance,
        possibly in a cluster environment.

        For now we use it to manage state transitions in that database
    '''
1481 1482 1483 1484 1485
    _clsLock = threading.Lock() # class wide lock
    _lastInt = 101  # next one is initial integer
    _lastTick = 0
    _lastLaggingTick = 0 # lagging tick, for unsequenced insersions

1486 1487 1488 1489
    def __init__(self, dbNum: int, dbc: DbConn): # TODO: remove dbc
        self._dbNum = dbNum # we assign a number to databases, for our testing purpose
        self._stateMachine = StateMechine(self)
        self._stateMachine.init(dbc)
1490
          
1491
        self._lock = threading.RLock()
S
Shuduo Sang 已提交
1492

1493 1494
    def getStateMachine(self) -> StateMechine:
        return self._stateMachine
S
Shuduo Sang 已提交
1495

1496 1497
    def getDbNum(self):
        return self._dbNum
1498

1499 1500
    def getName(self):
        return "db_{}".format(self._dbNum)
1501

1502 1503 1504 1505 1506 1507
    def filterTasks(self, inTasks: List[Task]): # Pick out those belonging to us
        outTasks = []
        for task in inTasks:
            if task.getDb().isSame(self):
                outTasks.append(task)
        return outTasks
1508

1509 1510 1511 1512 1513
    def isSame(self, other):
        return self._dbNum == other._dbNum

    def exists(self, dbc: DbConn):
        return dbc.existsDatabase(self.getName())
1514

1515 1516 1517 1518 1519 1520 1521
    @classmethod
    def getFixedSuperTableName(cls):
        return "fs_table"

    @classmethod
    def getFixedSuperTable(cls) -> TdSuperTable:
        return TdSuperTable(cls.getFixedSuperTableName())
1522 1523 1524 1525 1526 1527

    # We aim to create a starting time tick, such that, whenever we run our test here once
    # We should be able to safely create 100,000 records, which will not have any repeated time stamp
    # when we re-run the test in 3 minutes (180 seconds), basically we should expand time duration
    # by a factor of 500.
    # TODO: what if it goes beyond 10 years into the future
1528
    # TODO: fix the error as result of above: "tsdb timestamp is out of range"
1529 1530
    @classmethod
    def setupLastTick(cls):
1531
        t1 = datetime.datetime(2020, 6, 1)
1532
        t2 = datetime.datetime.now()
S
Shuduo Sang 已提交
1533 1534 1535 1536
        # maybe a very large number, takes 69 years to exceed Python int range
        elSec = int(t2.timestamp() - t1.timestamp())
        elSec2 = (elSec % (8 * 12 * 30 * 24 * 60 * 60 / 500)) * \
            500  # a number representing seconds within 10 years
1537
        # print("elSec = {}".format(elSec))
S
Shuduo Sang 已提交
1538 1539 1540
        t3 = datetime.datetime(2012, 1, 1)  # default "keep" is 10 years
        t4 = datetime.datetime.fromtimestamp(
            t3.timestamp() + elSec2)  # see explanation above
1541 1542 1543
        logger.info("Setting up TICKS to start from: {}".format(t4))
        return t4

1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555
    @classmethod
    def getNextTick(cls):        
        with cls._clsLock:  # prevent duplicate tick
            if cls._lastLaggingTick==0:
                # 10k at 1/20 chance, should be enough to avoid overlaps
                cls._lastLaggingTick = cls.setupLastTick() + datetime.timedelta(0, -10000)                 
            if cls._lastTick==0: # should be quite a bit into the future
                cls._lastTick = cls.setupLastTick()  

            if Dice.throw(20) == 0:  # 1 in 20 chance, return lagging tick
                cls._lastLaggingTick += datetime.timedelta(0, 1) # Go back in time 100 seconds
                return cls._lastLaggingTick 
S
Shuduo Sang 已提交
1556 1557
            else:  # regular
                # add one second to it
1558 1559
                cls._lastTick += datetime.timedelta(0, 1)
                return cls._lastTick
1560 1561

    def getNextInt(self):
1562 1563 1564
        with self._lock:
            self._lastInt += 1
            return self._lastInt
1565 1566

    def getNextBinary(self):
S
Shuduo Sang 已提交
1567 1568
        return "Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_{}".format(
            self.getNextInt())
1569 1570

    def getNextFloat(self):
1571 1572 1573
        ret = 0.9 + self.getNextInt()
        # print("Float obtained: {}".format(ret))
        return ret
S
Shuduo Sang 已提交
1574

1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623

class DbManager():
    ''' This is a wrapper around DbConn(), to make it easier to use. 

        TODO: rename this to DbConnManager
    '''
    def __init__(self):
        self.tableNumQueue = LinearQueue() # TODO: delete?
        # self.openDbServerConnection()
        self._dbConn = DbConn.createNative() if (
            gConfig.connector_type == 'native') else DbConn.createRest()
        try:
            self._dbConn.open()  # may throw taos.error.ProgrammingError: disconnected
        except taos.error.ProgrammingError as err:
            # print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
            if (err.msg == 'client disconnected'):  # cannot open DB connection
                print(
                    "Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
                sys.exit(2)
            else:
                print("Failed to connect to DB, errno = {}, msg: {}"
                    .format(Helper.convertErrno(err.errno), err.msg))
                raise
        except BaseException:
            print("[=] Unexpected exception")
            raise

        # Do this after dbConn is in proper shape
        # Moved to Database()
        # self._stateMachine = StateMechine(self._dbConn)

    def getDbConn(self):
        return self._dbConn

    # TODO: not used any more, to delete
    def pickAndAllocateTable(self):  # pick any table, and "use" it
        return self.tableNumQueue.pickAndAllocate()

    # TODO: Not used any more, to delete
    def addTable(self):
        with self._lock:
            tIndex = self.tableNumQueue.push()
        return tIndex

    # Not used any more, to delete
    def releaseTable(self, i):  # return the table back, so others can use it
        self.tableNumQueue.release(i)    

    # TODO: not used any more, delete
S
Steven Li 已提交
1624
    def getTableNameToDelete(self):
S
Shuduo Sang 已提交
1625 1626
        tblNum = self.tableNumQueue.pop()  # TODO: race condition!
        if (not tblNum):  # maybe false
1627
            return False
S
Shuduo Sang 已提交
1628

S
Steven Li 已提交
1629 1630
        return "table_{}".format(tblNum)

1631
    def cleanUp(self):
S
Shuduo Sang 已提交
1632 1633
        self._dbConn.close()

1634
class TaskExecutor():
1635
    class BoundedList:
S
Shuduo Sang 已提交
1636
        def __init__(self, size=10):
1637 1638
            self._size = size
            self._list = []
S
Steven Li 已提交
1639
            self._lock = threading.Lock()
1640

S
Shuduo Sang 已提交
1641
        def add(self, n: int):
S
Steven Li 已提交
1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667
            with self._lock:
                if not self._list:  # empty
                    self._list.append(n)
                    return
                # now we should insert
                nItems = len(self._list)
                insPos = 0
                for i in range(nItems):
                    insPos = i
                    if n <= self._list[i]:  # smaller than this item, time to insert
                        break  # found the insertion point
                    insPos += 1  # insert to the right

                if insPos == 0:  # except for the 1st item, # TODO: elimiate first item as gating item
                    return  # do nothing

                # print("Inserting at postion {}, value: {}".format(insPos, n))
                self._list.insert(insPos, n)  # insert

                newLen = len(self._list)
                if newLen <= self._size:
                    return  # do nothing
                elif newLen == (self._size + 1):
                    del self._list[0]  # remove the first item
                else:
                    raise RuntimeError("Corrupt Bounded List")
1668 1669 1670 1671 1672 1673

        def __str__(self):
            return repr(self._list)

    _boundedList = BoundedList()

1674 1675 1676
    def __init__(self, curStep):
        self._curStep = curStep

1677 1678 1679 1680
    @classmethod
    def getBoundedList(cls):
        return cls._boundedList

1681 1682 1683
    def getCurStep(self):
        return self._curStep

S
Shuduo Sang 已提交
1684
    def execute(self, task: Task, wt: WorkerThread):  # execute a task on a thread
1685
        task.execute(wt)
1686

1687 1688 1689 1690
    def recordDataMark(self, n: int):
        # print("[{}]".format(n), end="", flush=True)
        self._boundedList.add(n)

1691 1692
    # def logInfo(self, msg):
    #     logger.info("    T[{}.x]: ".format(self._curStep) + msg)
1693

1694 1695
    # def logDebug(self, msg):
    #     logger.debug("    T[{}.x]: ".format(self._curStep) + msg)
1696

S
Shuduo Sang 已提交
1697

S
Steven Li 已提交
1698
class Task():
1699 1700 1701 1702
    ''' A generic "Task" to be executed. For now we decide that there is no
        need to embed a DB connection here, we use whatever the Worker Thread has
        instead. But a task is always associated with a DB
    '''
1703 1704 1705 1706
    taskSn = 100

    @classmethod
    def allocTaskNum(cls):
S
Shuduo Sang 已提交
1707
        Task.taskSn += 1  # IMPORTANT: cannot use cls.taskSn, since each sub class will have a copy
S
Steven Li 已提交
1708 1709
        # logger.debug("Allocating taskSN: {}".format(Task.taskSn))
        return Task.taskSn
1710

1711
    def __init__(self, execStats: ExecutionStats, db: Database):
S
Shuduo Sang 已提交
1712
        self._workerThread = None
1713
        self._err = None # type: Exception
1714
        self._aborted = False
1715
        self._curStep = None
S
Shuduo Sang 已提交
1716
        self._numRows = None  # Number of rows affected
1717

S
Shuduo Sang 已提交
1718
        # Assign an incremental task serial number
1719
        self._taskNum = self.allocTaskNum()
S
Steven Li 已提交
1720
        # logger.debug("Creating new task {}...".format(self._taskNum))
1721

1722
        self._execStats = execStats
1723
        self._db = db # A task is always associated/for a specific DB
1724

1725
    def isSuccess(self):
S
Shuduo Sang 已提交
1726
        return self._err is None
1727

1728 1729 1730
    def isAborted(self):
        return self._aborted

S
Shuduo Sang 已提交
1731
    def clone(self):  # TODO: why do we need this again?
1732
        newTask = self.__class__(self._execStats, self._db)
1733 1734
        return newTask

1735 1736 1737
    def getDb(self):
        return self._db

1738
    def logDebug(self, msg):
S
Shuduo Sang 已提交
1739 1740 1741
        self._workerThread.logDebug(
            "Step[{}.{}] {}".format(
                self._curStep, self._taskNum, msg))
1742 1743

    def logInfo(self, msg):
S
Shuduo Sang 已提交
1744 1745 1746
        self._workerThread.logInfo(
            "Step[{}.{}] {}".format(
                self._curStep, self._taskNum, msg))
1747

1748
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
S
Shuduo Sang 已提交
1749 1750 1751
        raise RuntimeError(
            "To be implemeted by child classes, class name: {}".format(
                self.__class__.__name__))
1752

1753 1754 1755
    def _isErrAcceptable(self, errno, msg):
        if errno in [
                0x05,  # TSDB_CODE_RPC_NOT_READY
1756
                0x0B,  # Unable to establish connection, more details in TD-1648
1757
                # 0x200, # invalid SQL, TODO: re-examine with TD-934
1758 1759
                0x217, # "db not selected", client side defined error code
                0x218, # "Table does not exist" client side defined error code
1760 1761 1762 1763 1764 1765 1766 1767 1768
                0x360, 0x362, 
                0x369, # tag already exists
                0x36A, 0x36B, 0x36D,
                0x381, 
                0x380, # "db not selected"
                0x383,
                0x386,  # DB is being dropped?!
                0x503,
                0x510,  # vnode not in ready state
1769
                0x14,   # db not ready, errno changed
1770
                0x600,  # Invalid table ID, why?
1771 1772 1773
                1000  # REST catch-all error
            ]: 
            return True # These are the ALWAYS-ACCEPTABLE ones
1774 1775
        elif (errno in [ 0x0B ]) and gConfig.auto_start_service:
            return True # We may get "network unavilable" when restarting service
1776 1777 1778
        elif errno == 0x200 : # invalid SQL, we need to div in a bit more
            if msg.find("invalid column name") != -1:
                return True 
1779 1780 1781 1782
            elif msg.find("tags number not matched") != -1: # mismatched tags after modification
                return True
            elif msg.find("duplicated column names") != -1: # also alter table tag issues
                return True
1783 1784
        elif gSvcMgr and (not gSvcMgr.isStable()): # We are managing service, and ...
            logger.info("Ignoring error when service starting/stopping: errno = {}, msg = {}".format(errno, msg))
S
Steven Li 已提交
1785
            return True
1786 1787 1788 1789
        
        return False # Not an acceptable error


1790 1791
    def execute(self, wt: WorkerThread):
        wt.verifyThreadSelf()
S
Shuduo Sang 已提交
1792
        self._workerThread = wt  # type: ignore
1793 1794

        te = wt.getTaskExecutor()
1795
        self._curStep = te.getCurStep()
S
Shuduo Sang 已提交
1796 1797
        self.logDebug(
            "[-] executing task {}...".format(self.__class__.__name__))
1798

1799
        self._err = None # TODO: type hint mess up?
1800 1801
        self._execStats.beginTaskType(self.__class__.__name__)  # mark beginning
        errno2 = None
1802 1803 1804

        # Now pick a database, and stick with it for the duration of the task execution
        dbName = self._db.getName()
1805
        try:
S
Shuduo Sang 已提交
1806
            self._executeInternal(te, wt)  # TODO: no return value?
1807
        except taos.error.ProgrammingError as err:
1808
            errno2 = Helper.convertErrno(err.errno)
1809
            if (gConfig.continue_on_exception):  # user choose to continue
1810
                self.logDebug("[=] Continue after TAOS exception: errno=0x{:X}, msg: {}, SQL: {}".format(
1811
                        errno2, err, wt.getDbConn().getLastSql()))
1812
                self._err = err
1813 1814
            elif self._isErrAcceptable(errno2, err.__str__()):
                self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(
1815
                        errno2, err, wt.getDbConn().getLastSql()))
1816
                print("_", end="", flush=True)
S
Shuduo Sang 已提交
1817
                self._err = err
1818
            else: # not an acceptable error
1819 1820 1821
                errMsg = "[=] Unexpected Taos library exception ({}): errno=0x{:X}, msg: {}, SQL: {}".format(
                    self.__class__.__name__,
                    errno2, err, wt.getDbConn().getLastSql())
1822
                self.logDebug(errMsg)
S
Shuduo Sang 已提交
1823
                if gConfig.debug:
1824 1825
                    # raise # so that we see full stack
                    traceback.print_exc()
1826 1827
                print(
                    "\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) +
1828 1829 1830 1831
                    "----------------------------\n")
                # sys.exit(-1)
                self._err = err
                self._aborted = True
S
Shuduo Sang 已提交
1832
        except Exception as e:
S
Steven Li 已提交
1833
            self.logInfo("Non-TAOS exception encountered")
S
Shuduo Sang 已提交
1834
            self._err = e
S
Steven Li 已提交
1835
            self._aborted = True
1836
            traceback.print_exc()
1837
        except BaseException as e:
1838
            self.logInfo("Python base exception encountered")
1839
            self._err = e
1840
            self._aborted = True
S
Steven Li 已提交
1841
            traceback.print_exc()
1842
        except BaseException: # TODO: what is this again??!!
S
Shuduo Sang 已提交
1843 1844
            self.logDebug(
                "[=] Unexpected exception, SQL: {}".format(
1845
                    wt.getDbConn().getLastSql()))
1846
            raise
1847
        self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
S
Shuduo Sang 已提交
1848 1849 1850 1851

        self.logDebug("[X] task execution completed, {}, status: {}".format(
            self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))
        # TODO: merge with above.
1852
        self._execStats.incExecCount(self.__class__.__name__, self.isSuccess(), errno2)
S
Steven Li 已提交
1853

1854
    # TODO: refactor away, just provide the dbConn
S
Shuduo Sang 已提交
1855
    def execWtSql(self, wt: WorkerThread, sql):  # execute an SQL on the worker thread
1856
        """ Haha """
1857 1858
        return wt.execSql(sql)

S
Shuduo Sang 已提交
1859
    def queryWtSql(self, wt: WorkerThread, sql):  # execute an SQL on the worker thread
1860 1861
        return wt.querySql(sql)

S
Shuduo Sang 已提交
1862
    def getQueryResult(self, wt: WorkerThread):  # execute an SQL on the worker thread
1863 1864 1865
        return wt.getQueryResult()


1866
class ExecutionStats:
1867
    def __init__(self):
S
Shuduo Sang 已提交
1868 1869
        # total/success times for a task
        self._execTimes: Dict[str, [int, int]] = {}
1870 1871 1872
        self._tasksInProgress = 0
        self._lock = threading.Lock()
        self._firstTaskStartTime = None
1873
        self._execStartTime = None
1874
        self._errors = {}
S
Shuduo Sang 已提交
1875 1876
        self._elapsedTime = 0.0  # total elapsed time
        self._accRunTime = 0.0  # accumulated run time
1877

1878 1879 1880
        self._failed = False
        self._failureReason = None

S
Steven Li 已提交
1881
    def __str__(self):
S
Shuduo Sang 已提交
1882 1883
        return "[ExecStats: _failed={}, _failureReason={}".format(
            self._failed, self._failureReason)
S
Steven Li 已提交
1884 1885

    def isFailed(self):
S
Shuduo Sang 已提交
1886
        return self._failed
S
Steven Li 已提交
1887

1888 1889 1890 1891 1892 1893
    def startExec(self):
        self._execStartTime = time.time()

    def endExec(self):
        self._elapsedTime = time.time() - self._execStartTime

1894
    def incExecCount(self, klassName, isSuccess, eno=None):  # TODO: add a lock here
1895 1896
        if klassName not in self._execTimes:
            self._execTimes[klassName] = [0, 0]
S
Shuduo Sang 已提交
1897 1898
        t = self._execTimes[klassName]  # tuple for the data
        t[0] += 1  # index 0 has the "total" execution times
1899
        if isSuccess:
S
Shuduo Sang 已提交
1900
            t[1] += 1  # index 1 has the "success" execution times
1901 1902 1903 1904 1905
        if eno != None:             
            if klassName not in self._errors:
                self._errors[klassName] = {}
            errors = self._errors[klassName]
            errors[eno] = errors[eno]+1 if eno in errors else 1
1906 1907 1908

    def beginTaskType(self, klassName):
        with self._lock:
S
Shuduo Sang 已提交
1909 1910
            if self._tasksInProgress == 0:  # starting a new round
                self._firstTaskStartTime = time.time()  # I am now the first task
1911 1912 1913 1914 1915
            self._tasksInProgress += 1

    def endTaskType(self, klassName, isSuccess):
        with self._lock:
            self._tasksInProgress -= 1
S
Shuduo Sang 已提交
1916
            if self._tasksInProgress == 0:  # all tasks have stopped
1917 1918 1919
                self._accRunTime += (time.time() - self._firstTaskStartTime)
                self._firstTaskStartTime = None

1920 1921 1922 1923
    def registerFailure(self, reason):
        self._failed = True
        self._failureReason = reason

1924
    def printStats(self):
S
Shuduo Sang 已提交
1925 1926 1927 1928 1929 1930
        logger.info(
            "----------------------------------------------------------------------")
        logger.info(
            "| Crash_Gen test {}, with the following stats:". format(
                "FAILED (reason: {})".format(
                    self._failureReason) if self._failed else "SUCCEEDED"))
1931
        logger.info("| Task Execution Times (success/total):")
1932
        execTimesAny = 0.001 # avoid div by zero
S
Shuduo Sang 已提交
1933
        for k, n in self._execTimes.items():
1934
            execTimesAny += n[0]
1935 1936 1937 1938 1939 1940 1941 1942
            errStr = None
            if k in self._errors:
                errors = self._errors[k]
                # print("errors = {}".format(errors))
                errStrs = ["0x{:X}:{}".format(eno, n) for (eno, n) in errors.items()]
                # print("error strings = {}".format(errStrs))
                errStr = ", ".join(errStrs) 
            logger.info("|    {0:<24}: {1}/{2} (Errors: {3})".format(k, n[1], n[0], errStr))
S
Shuduo Sang 已提交
1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956

        logger.info(
            "| Total Tasks Executed (success or not): {} ".format(execTimesAny))
        logger.info(
            "| Total Tasks In Progress at End: {}".format(
                self._tasksInProgress))
        logger.info(
            "| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(
                self._accRunTime))
        logger.info(
            "| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime / execTimesAny))
        logger.info(
            "| Total Elapsed Time (from wall clock): {:.3f} seconds".format(
                self._elapsedTime))
1957
        logger.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList()))
1958
        logger.info("| Active DB Native Connections (now): {}".format(DbConnNative.totalConnections))
1959 1960 1961 1962
        logger.info("| Longest native query time: {:.3f} seconds, started: {}".
            format(MyTDSql.longestQueryTime, 
                time.strftime("%x %X", time.localtime(MyTDSql.lqStartTime))) )
        logger.info("| Longest native query: {}".format(MyTDSql.longestQuery))
S
Shuduo Sang 已提交
1963 1964
        logger.info(
            "----------------------------------------------------------------------")
1965 1966 1967


class StateTransitionTask(Task):
1968 1969 1970 1971 1972
    LARGE_NUMBER_OF_TABLES = 35
    SMALL_NUMBER_OF_TABLES = 3
    LARGE_NUMBER_OF_RECORDS = 50
    SMALL_NUMBER_OF_RECORDS = 3

1973
    @classmethod
S
Shuduo Sang 已提交
1974
    def getInfo(cls):  # each sub class should supply their own information
1975 1976
        raise RuntimeError("Overriding method expected")

S
Shuduo Sang 已提交
1977
    _endState = None
1978
    @classmethod
S
Shuduo Sang 已提交
1979
    def getEndState(cls):  # TODO: optimize by calling it fewer times
1980 1981
        raise RuntimeError("Overriding method expected")

1982 1983 1984
    # @classmethod
    # def getBeginStates(cls):
    #     return cls.getInfo()[0]
1985

1986 1987 1988
    # @classmethod
    # def getEndState(cls): # returning the class name
    #     return cls.getInfo()[0]
1989 1990

    @classmethod
1991 1992 1993
    def canBeginFrom(cls, state: AnyState):
        # return state.getValue() in cls.getBeginStates()
        raise RuntimeError("must be overriden")
1994

1995 1996
    @classmethod
    def getRegTableName(cls, i):
1997
        return "reg_table_{}".format(i)
1998

1999 2000
    def execute(self, wt: WorkerThread):
        super().execute(wt)
S
Shuduo Sang 已提交
2001 2002


2003
class TaskCreateDb(StateTransitionTask):
2004
    @classmethod
2005
    def getEndState(cls):
S
Shuduo Sang 已提交
2006
        return StateDbOnly()
2007

2008 2009 2010 2011
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canCreateDb()

2012
    # Actually creating the database(es)
2013
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
2014
        # was: self.execWtSql(wt, "create database db")
2015 2016 2017 2018 2019 2020
        repStr = ""
        if gConfig.max_replicas != 1:
            numReplica = Dice.throw(gConfig.max_replicas) + 1 # 1,2 ... N
            repStr = "replica {}".format(numReplica)
        self.execWtSql(wt, "create database {} {}"
            .format(self._db.getName(), repStr) )
2021

2022
class TaskDropDb(StateTransitionTask):
2023
    @classmethod
2024 2025
    def getEndState(cls):
        return StateEmpty()
2026

2027 2028 2029 2030
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canDropDb()

2031
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
2032
        self.execWtSql(wt, "drop database {}".format(self._db.getName()))
S
Steven Li 已提交
2033
        logger.debug("[OPS] database dropped at {}".format(time.time()))
2034

2035
class TaskCreateSuperTable(StateTransitionTask):
2036
    @classmethod
2037 2038
    def getEndState(cls):
        return StateSuperTableOnly()
2039

2040 2041
    @classmethod
    def canBeginFrom(cls, state: AnyState):
2042
        return state.canCreateFixedSuperTable()
2043

2044
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
2045
        if not self._db.exists(wt.getDbConn()):
2046 2047 2048
            logger.debug("Skipping task, no DB yet")
            return

2049
        sTable = self._db.getFixedSuperTable() # type: TdSuperTable
2050
        # wt.execSql("use db")    # should always be in place
2051 2052
        sTable.create(wt.getDbConn(), self._db.getName(), 
            {'ts':'timestamp', 'speed':'int'}, {'b':'binary(200)', 'f':'float'})
2053
        # self.execWtSql(wt,"create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
S
Shuduo Sang 已提交
2054 2055
        # No need to create the regular tables, INSERT will do that
        # automatically
2056

S
Steven Li 已提交
2057

2058 2059 2060 2061
class TdSuperTable:
    def __init__(self, stName):
        self._stName = stName

2062 2063 2064
    def getName(self):
        return self._stName

2065 2066 2067 2068 2069
    # TODO: odd semantic, create() method is usually static?
    def create(self, dbc, dbName, cols: dict, tags: dict):
        '''Creating a super table'''
        sql = "CREATE TABLE {}.{} ({}) TAGS ({})".format(
            dbName,
2070 2071 2072 2073 2074 2075
            self._stName,
            ",".join(['%s %s'%(k,v) for (k,v) in cols.items()]),
            ",".join(['%s %s'%(k,v) for (k,v) in tags.items()])
            )
        dbc.execute(sql)        

2076
    def getRegTables(self, dbc: DbConn, dbName: str):
2077
        try:
2078
            dbc.query("select TBNAME from {}.{}".format(dbName, self._stName))  # TODO: analyze result set later            
2079
        except taos.error.ProgrammingError as err:                    
2080
            errno2 = Helper.convertErrno(err.errno) 
2081 2082 2083 2084 2085 2086
            logger.debug("[=] Failed to get tables from super table: errno=0x{:X}, msg: {}".format(errno2, err))
            raise

        qr = dbc.getQueryResult()
        return [v[0] for v in qr] # list transformation, ref: https://stackoverflow.com/questions/643823/python-list-transformation

2087 2088
    def hasRegTables(self, dbc: DbConn, dbName: str):
        return dbc.query("SELECT * FROM {}.{}".format(dbName, self._stName)) > 0
2089

2090 2091
    def ensureTable(self, dbc: DbConn, dbName: str, regTableName: str):
        sql = "select tbname from {}.{} where tbname in ('{}')".format(dbName, self._stName, regTableName)
2092 2093
        if dbc.query(sql) >= 1 : # reg table exists already
            return
2094 2095
        sql = "CREATE TABLE {}.{} USING {}.{} tags ({})".format(
            dbName, regTableName, dbName, self._stName, self._getTagStrForSql(dbc, dbName)
2096 2097 2098
        )
        dbc.execute(sql)

2099 2100
    def _getTagStrForSql(self, dbc, dbName: str) :
        tags = self._getTags(dbc, dbName)
2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113
        tagStrs = []
        for tagName in tags: 
            tagType = tags[tagName]
            if tagType == 'BINARY':
                tagStrs.append("'Beijing-Shanghai-LosAngeles'")
            elif tagType == 'FLOAT':
                tagStrs.append('9.9')
            elif tagType == 'INT':
                tagStrs.append('88')
            else:
                raise RuntimeError("Unexpected tag type: {}".format(tagType))
        return ", ".join(tagStrs)

2114 2115
    def _getTags(self, dbc, dbName) -> dict:
        dbc.query("DESCRIBE {}.{}".format(dbName, self._stName))
2116 2117 2118 2119 2120 2121
        stCols = dbc.getQueryResult()
        # print(stCols)
        ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type
        # print("Tags retrieved: {}".format(ret))
        return ret

2122 2123
    def addTag(self, dbc, dbName, tagName, tagType):
        if tagName in self._getTags(dbc, dbName): # already 
2124 2125
            return
        # sTable.addTag("extraTag", "int")
2126
        sql = "alter table {}.{} add tag {} {}".format(dbName, self._stName, tagName, tagType)
2127 2128
        dbc.execute(sql)

2129 2130
    def dropTag(self, dbc, dbName, tagName):
        if not tagName in self._getTags(dbc, dbName): # don't have this tag
2131
            return
2132
        sql = "alter table {}.{} drop tag {}".format(dbName, self._stName, tagName)
2133 2134
        dbc.execute(sql)

2135 2136
    def changeTag(self, dbc, dbName, oldTag, newTag):
        tags = self._getTags(dbc, dbName)
2137 2138 2139 2140
        if not oldTag in tags: # don't have this tag
            return
        if newTag in tags: # already have this tag
            return
2141
        sql = "alter table {}.{} change tag {} {}".format(dbName, self._stName, oldTag, newTag)
2142 2143
        dbc.execute(sql)

2144
class TaskReadData(StateTransitionTask):
2145
    @classmethod
2146
    def getEndState(cls):
S
Shuduo Sang 已提交
2147
        return None  # meaning doesn't affect state
2148

2149 2150 2151 2152
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canReadData()

2153
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
2154
        sTable = self._db.getFixedSuperTable()
2155

2156 2157
        # 1 in 5 chance, simulate a broken connection. 
        if random.randrange(5) == 0:  # TODO: break connection in all situations
2158 2159
            wt.getDbConn().close()
            wt.getDbConn().open()
2160
            print("_r", end="", flush=True)
2161
        
2162
        dbc = wt.getDbConn()
2163 2164
        dbName = self._db.getName()
        for rTbName in sTable.getRegTables(dbc, dbName):  # regular tables
2165
            aggExpr = Dice.choice([
2166 2167 2168
                '*',
                'count(*)',
                'avg(speed)',
2169
                # 'twa(speed)', # TODO: this one REQUIRES a where statement, not reasonable
2170 2171
                'sum(speed)', 
                'stddev(speed)', 
2172
                # SELECTOR functions
2173 2174 2175
                'min(speed)', 
                'max(speed)', 
                'first(speed)', 
2176
                'last(speed)',
2177 2178 2179
                'top(speed, 50)', # TODO: not supported?
                'bottom(speed, 50)', # TODO: not supported?
                'apercentile(speed, 10)', # TODO: TD-1316
2180 2181 2182 2183 2184
                'last_row(speed)',
                # Transformation Functions
                # 'diff(speed)', # TODO: no supported?!
                'spread(speed)'
                ]) # TODO: add more from 'top'
2185 2186 2187
            filterExpr = Dice.choice([ # TODO: add various kind of WHERE conditions
                None
            ])
2188
            try:
2189
                # Run the query against the regular table first
2190
                dbc.execute("select {} from {}.{}".format(aggExpr, dbName, rTbName))
2191
                # Then run it against the super table
2192
                if aggExpr not in ['stddev(speed)']: #TODO: STDDEV not valid for super tables?!
2193
                    dbc.execute("select {} from {}.{}".format(aggExpr, dbName, sTable.getName()))
2194
            except taos.error.ProgrammingError as err:                    
2195
                errno2 = Helper.convertErrno(err.errno)
2196
                logger.debug("[=] Read Failure: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, dbc.getLastSql()))
2197
                raise
S
Shuduo Sang 已提交
2198

2199
class TaskDropSuperTable(StateTransitionTask):
2200
    @classmethod
2201
    def getEndState(cls):
S
Shuduo Sang 已提交
2202
        return StateDbOnly()
2203

2204 2205
    @classmethod
    def canBeginFrom(cls, state: AnyState):
2206
        return state.canDropFixedSuperTable()
2207

2208
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
2209
        # 1/2 chance, we'll drop the regular tables one by one, in a randomized sequence
S
Shuduo Sang 已提交
2210
        if Dice.throw(2) == 0:
2211
            # print("_7_", end="", flush=True)
S
Shuduo Sang 已提交
2212 2213 2214 2215
            tblSeq = list(range(
                2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)))
            random.shuffle(tblSeq)
            tickOutput = False  # if we have spitted out a "d" character for "drop regular table"
2216
            isSuccess = True
S
Shuduo Sang 已提交
2217
            for i in tblSeq:
2218
                regTableName = self.getRegTableName(i)  # "db.reg_table_{}".format(i)
2219
                try:
2220 2221
                    self.execWtSql(wt, "drop table {}.{}".
                        format(self._db.getName(), regTableName))  # nRows always 0, like MySQL
S
Shuduo Sang 已提交
2222
                except taos.error.ProgrammingError as err:
2223 2224
                    # correcting for strange error number scheme                    
                    errno2 = Helper.convertErrno(err.errno)
S
Shuduo Sang 已提交
2225
                    if (errno2 in [0x362]):  # mnode invalid table name
2226
                        isSuccess = False
2227
                        logger.debug("[DB] Acceptable error when dropping a table")
S
Shuduo Sang 已提交
2228
                    continue  # try to delete next regular table
2229 2230

                if (not tickOutput):
S
Shuduo Sang 已提交
2231 2232
                    tickOutput = True  # Print only one time
                    if isSuccess:
2233 2234
                        print("d", end="", flush=True)
                    else:
S
Shuduo Sang 已提交
2235
                        print("f", end="", flush=True)
2236 2237

        # Drop the super table itself
2238 2239
        tblName = self._db.getFixedSuperTableName()
        self.execWtSql(wt, "drop table {}.{}".format(self._db.getName(), tblName))
2240

S
Shuduo Sang 已提交
2241

2242 2243 2244
class TaskAlterTags(StateTransitionTask):
    @classmethod
    def getEndState(cls):
S
Shuduo Sang 已提交
2245
        return None  # meaning doesn't affect state
2246 2247 2248

    @classmethod
    def canBeginFrom(cls, state: AnyState):
S
Shuduo Sang 已提交
2249
        return state.canDropFixedSuperTable()  # if we can drop it, we can alter tags
2250 2251

    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
2252 2253
        # tblName = self._dbManager.getFixedSuperTableName()
        dbc = wt.getDbConn()
2254 2255
        sTable = self._db.getFixedSuperTable()
        dbName = self._db.getName()
2256
        dice = Dice.throw(4)
S
Shuduo Sang 已提交
2257
        if dice == 0:
2258
            sTable.addTag(dbc, dbName, "extraTag", "int")
2259
            # sql = "alter table db.{} add tag extraTag int".format(tblName)
S
Shuduo Sang 已提交
2260
        elif dice == 1:
2261
            sTable.dropTag(dbc, dbName, "extraTag")
2262
            # sql = "alter table db.{} drop tag extraTag".format(tblName)
S
Shuduo Sang 已提交
2263
        elif dice == 2:
2264
            sTable.dropTag(dbc, dbName, "newTag")
2265
            # sql = "alter table db.{} drop tag newTag".format(tblName)
S
Shuduo Sang 已提交
2266
        else:  # dice == 3
2267
            sTable.changeTag(dbc, dbName, "extraTag", "newTag")
2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283
            # sql = "alter table db.{} change tag extraTag newTag".format(tblName)

class TaskRestartService(StateTransitionTask):
    _isRunning = False
    _classLock = threading.Lock()

    @classmethod
    def getEndState(cls):
        return None  # meaning doesn't affect state

    @classmethod
    def canBeginFrom(cls, state: AnyState):
        if gConfig.auto_start_service:
            return state.canDropFixedSuperTable()  # Basicallly when we have the super table
        return False # don't run this otherwise

2284
    CHANCE_TO_RESTART_SERVICE = 200
2285 2286 2287 2288
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        if not gConfig.auto_start_service: # only execute when we are in -a mode
            print("_a", end="", flush=True)
            return
2289

2290 2291 2292 2293 2294 2295
        with self._classLock:
            if self._isRunning:
                print("Skipping restart task, another running already")
                return
            self._isRunning = True

2296
        if Dice.throw(self.CHANCE_TO_RESTART_SERVICE) == 0: # 1 in N chance
2297 2298 2299
            dbc = wt.getDbConn()
            dbc.execute("show databases") # simple delay, align timing with other workers
            gSvcMgr.restart()
2300

2301
        self._isRunning = False
S
Shuduo Sang 已提交
2302

2303
class TaskAddData(StateTransitionTask):
S
Shuduo Sang 已提交
2304 2305
    # Track which table is being actively worked on
    activeTable: Set[int] = set()
2306

S
Shuduo Sang 已提交
2307 2308
    # We use these two files to record operations to DB, useful for power-off
    # tests
2309 2310 2311 2312 2313
    fAddLogReady = None
    fAddLogDone = None

    @classmethod
    def prepToRecordOps(cls):
S
Shuduo Sang 已提交
2314 2315 2316 2317
        if gConfig.record_ops:
            if (cls.fAddLogReady is None):
                logger.info(
                    "Recording in a file operations to be performed...")
2318
                cls.fAddLogReady = open("add_log_ready.txt", "w")
S
Shuduo Sang 已提交
2319
            if (cls.fAddLogDone is None):
2320 2321
                logger.info("Recording in a file operations completed...")
                cls.fAddLogDone = open("add_log_done.txt", "w")
2322

2323
    @classmethod
2324 2325
    def getEndState(cls):
        return StateHasData()
2326 2327 2328 2329

    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canAddData()
S
Shuduo Sang 已提交
2330

2331
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
2332 2333
        # ds = self._dbManager # Quite DANGEROUS here, may result in multi-thread client access
        db = self._db
2334
        dbc = wt.getDbConn()
2335
        tblSeq = list(range(
S
Shuduo Sang 已提交
2336 2337 2338 2339
                self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))
        random.shuffle(tblSeq)
        for i in tblSeq:
            if (i in self.activeTable):  # wow already active
2340
                print("x", end="", flush=True) # concurrent insertion
2341
            else:
S
Shuduo Sang 已提交
2342
                self.activeTable.add(i)  # marking it active
2343
            
2344
            sTable = db.getFixedSuperTable()
2345
            regTableName = self.getRegTableName(i)  # "db.reg_table_{}".format(i)
2346
            sTable.ensureTable(wt.getDbConn(), db.getName(), regTableName)  # Ensure the table exists           
2347 2348
           
            for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS):  # number of records per table
2349
                nextInt = db.getNextInt()
2350
                nextTick = db.getNextTick()
2351 2352
                if gConfig.record_ops:
                    self.prepToRecordOps()
2353
                    self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
2354 2355
                    self.fAddLogReady.flush()
                    os.fsync(self.fAddLogReady)
2356 2357
                sql = "insert into {}.{} values ('{}', {});".format( # removed: tags ('{}', {})
                    db.getName(),
S
Shuduo Sang 已提交
2358
                    regTableName,
2359 2360
                    # ds.getFixedSuperTableName(),
                    # ds.getNextBinary(), ds.getNextFloat(),
2361 2362
                    nextTick, nextInt)
                dbc.execute(sql)
S
Shuduo Sang 已提交
2363 2364
                # Successfully wrote the data into the DB, let's record it
                # somehow
2365
                te.recordDataMark(nextInt)
2366
                if gConfig.record_ops:
S
Shuduo Sang 已提交
2367 2368 2369
                    self.fAddLogDone.write(
                        "Wrote {} to {}\n".format(
                            nextInt, regTableName))
2370 2371
                    self.fAddLogDone.flush()
                    os.fsync(self.fAddLogDone)
2372 2373

                # Now read it back and verify, we might encounter an error if table is dropped
2374 2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390
                if gConfig.verify_data: # only if command line asks for it
                    try:
                        readBack = dbc.queryScalar("SELECT speed from {}.{} WHERE ts= '{}'".
                            format(db.getName(), regTableName, nextTick))
                        if readBack != nextInt :
                            raise taos.error.ProgrammingError(
                                "Failed to read back same data, wrote: {}, read: {}"
                                .format(nextInt, readBack), 0x999)
                    except taos.error.ProgrammingError as err:
                        errno = Helper.convertErrno(err.errno)
                        if errno in [0x991, 0x992]  : # not a single result
                            raise taos.error.ProgrammingError(
                                "Failed to read back same data for tick: {}, wrote: {}, read: {}"
                                .format(nextTick, nextInt, "Empty Result" if errno==0x991 else "Multiple Result"),
                                errno)
                        # Re-throw no matter what
                        raise
2391 2392
                

S
Shuduo Sang 已提交
2393
            self.activeTable.discard(i)  # not raising an error, unlike remove
2394 2395


S
Steven Li 已提交
2396 2397
# Deterministic random number generator
class Dice():
S
Shuduo Sang 已提交
2398
    seeded = False  # static, uninitialized
S
Steven Li 已提交
2399 2400

    @classmethod
S
Shuduo Sang 已提交
2401
    def seed(cls, s):  # static
S
Steven Li 已提交
2402
        if (cls.seeded):
S
Shuduo Sang 已提交
2403 2404
            raise RuntimeError(
                "Cannot seed the random generator more than once")
S
Steven Li 已提交
2405 2406 2407 2408 2409
        cls.verifyRNG()
        random.seed(s)
        cls.seeded = True  # TODO: protect against multi-threading

    @classmethod
S
Shuduo Sang 已提交
2410
    def verifyRNG(cls):  # Verify that the RNG is determinstic
S
Steven Li 已提交
2411 2412 2413 2414
        random.seed(0)
        x1 = random.randrange(0, 1000)
        x2 = random.randrange(0, 1000)
        x3 = random.randrange(0, 1000)
S
Shuduo Sang 已提交
2415
        if (x1 != 864 or x2 != 394 or x3 != 776):
S
Steven Li 已提交
2416 2417 2418
            raise RuntimeError("System RNG is not deterministic")

    @classmethod
S
Shuduo Sang 已提交
2419
    def throw(cls, stop):  # get 0 to stop-1
2420
        return cls.throwRange(0, stop)
S
Steven Li 已提交
2421 2422

    @classmethod
S
Shuduo Sang 已提交
2423 2424
    def throwRange(cls, start, stop):  # up to stop-1
        if (not cls.seeded):
S
Steven Li 已提交
2425
            raise RuntimeError("Cannot throw dice before seeding it")
2426
        return random.randrange(start, stop)
S
Steven Li 已提交
2427

2428 2429 2430 2431
    @classmethod
    def choice(cls, cList):
        return random.choice(cList)

S
Steven Li 已提交
2432

S
Steven Li 已提交
2433 2434
class LoggingFilter(logging.Filter):
    def filter(self, record: logging.LogRecord):
S
Shuduo Sang 已提交
2435 2436
        if (record.levelno >= logging.INFO):
            return True  # info or above always log
S
Steven Li 已提交
2437

S
Steven Li 已提交
2438 2439 2440 2441
        # Commenting out below to adjust...

        # if msg.startswith("[TRD]"):
        #     return False
S
Steven Li 已提交
2442 2443
        return True

S
Shuduo Sang 已提交
2444 2445

class MyLoggingAdapter(logging.LoggerAdapter):
2446 2447 2448 2449
    def process(self, msg, kwargs):
        return "[{}]{}".format(threading.get_ident() % 10000, msg), kwargs
        # return '[%s] %s' % (self.extra['connid'], msg), kwargs

S
Shuduo Sang 已提交
2450

2451 2452 2453
class ServiceManager:
    PAUSE_BETWEEN_IPC_CHECK = 1.2  # seconds between checks on STDOUT of sub process

2454 2455 2456
    def __init__(self, numDnodes = 1):
        logger.info("TDengine Service Manager (TSM) created")
        self._numDnodes = numDnodes # >1 means we have a cluster
2457 2458 2459
        # signal.signal(signal.SIGTERM, self.sigIntHandler) # Moved to MainExec
        # signal.signal(signal.SIGINT, self.sigIntHandler)
        # signal.signal(signal.SIGUSR1, self.sigUsrHandler)  # different handler!
2460

2461
        self.inSigHandler = False
2462 2463
        # self._status = MainExec.STATUS_RUNNING # set inside
        # _startTaosService()
2464 2465 2466 2467
        self.svcMgrThreads = [] # type: List[ServiceManagerThread]
        for i in range(0, numDnodes):
            self.svcMgrThreads.append(ServiceManagerThread(i))

2468
        self._lock = threading.Lock()
2469
        # self._isRestarting = False
2470

2471 2472 2473 2474 2475 2476 2477 2478
    def _doMenu(self):
        choice = ""
        while True:
            print("\nInterrupting Service Program, Choose an Action: ")
            print("1: Resume")
            print("2: Terminate")
            print("3: Restart")
            # Remember to update the if range below
S
Shuduo Sang 已提交
2479
            # print("Enter Choice: ", end="", flush=True)
2480 2481 2482
            while choice == "":
                choice = input("Enter Choice: ")
                if choice != "":
S
Shuduo Sang 已提交
2483 2484 2485
                    break  # done with reading repeated input
            if choice in ["1", "2", "3"]:
                break  # we are done with whole method
2486
            print("Invalid choice, please try again.")
S
Shuduo Sang 已提交
2487
            choice = ""  # reset
2488 2489
        return choice

S
Shuduo Sang 已提交
2490
    def sigUsrHandler(self, signalNumber, frame):
2491
        print("Interrupting main thread execution upon SIGUSR1")
2492
        if self.inSigHandler:  # already
2493
            print("Ignoring repeated SIG...")
S
Shuduo Sang 已提交
2494
            return  # do nothing if it's already not running
2495
        self.inSigHandler = True
2496 2497

        choice = self._doMenu()
2498 2499
        if choice == "1":            
            self.sigHandlerResume() # TODO: can the sub-process be blocked due to us not reading from queue?
S
Shuduo Sang 已提交
2500
        elif choice == "2":
2501
            self.stopTaosServices()
2502 2503
        elif choice == "3": # Restart
            self.restart()
2504 2505
        else:
            raise RuntimeError("Invalid menu choice: {}".format(choice))
2506

2507 2508
        self.inSigHandler = False

2509
    def sigIntHandler(self, signalNumber, frame):
2510
        print("ServiceManager: INT Signal Handler starting...")
2511
        if self.inSigHandler:
2512 2513
            print("Ignoring repeated SIG_INT...")
            return
2514
        self.inSigHandler = True
2515

2516
        self.stopTaosServices()
2517
        print("ServiceManager: INT Signal Handler returning...")
2518
        self.inSigHandler = False
2519

S
Shuduo Sang 已提交
2520
    def sigHandlerResume(self):
2521
        print("Resuming TDengine service manager (main thread)...\n\n")
2522

2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557
    # def _updateThreadStatus(self):
    #     if self.svcMgrThread:  # valid svc mgr thread
    #         if self.svcMgrThread.isStopped():  # done?
    #             self.svcMgrThread.procIpcBatch()  # one last time. TODO: appropriate?
    #             self.svcMgrThread = None  # no more

    def isActive(self):
        """
        Determine if the service/cluster is active at all, i.e. at least
        one thread is not "stopped".
        """
        for thread in self.svcMgrThreads:
            if not thread.isStopped():
                return True
        return False

    # def isRestarting(self):
    #     """
    #     Determine if the service/cluster is being "restarted", i.e., at least
    #     one thread is in "restarting" status
    #     """
    #     for thread in self.svcMgrThreads:
    #         if thread.isRestarting():
    #             return True
    #     return False

    def isStable(self):
        """
        Determine if the service/cluster is "stable", i.e. all of the
        threads are in "stable" status.
        """
        for thread in self.svcMgrThreads:
            if not thread.isStable():
                return False
        return True
2558 2559

    def _procIpcAll(self):
2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571
        while self.isActive():
            for thread in self.svcMgrThreads: # all thread objects should always be valid
            # while self.isRunning() or self.isRestarting() :  # for as long as the svc mgr thread is still here
                if  thread.isRunning():
                    thread.procIpcBatch()  # regular processing,
                    if  thread.isStopped():
                        thread.procIpcBatch() # one last time?
                    # self._updateThreadStatus()
                elif thread.isRetarting():
                    print("Service restarting...")
                # else this thread is stopped 
                    
2572
            time.sleep(self.PAUSE_BETWEEN_IPC_CHECK)  # pause, before next round
2573
        # raise CrashGenError("dummy")
2574
        print("Service Manager Thread (with subprocess) ended, main thread exiting...")
2575

2576
    def startTaosServices(self):
2577
        with self._lock:
2578 2579
            if self.isActive():
                raise RuntimeError("Cannot start TAOS service(s) when one/some may already be running")
2580 2581 2582 2583 2584 2585 2586 2587 2588

            # Find if there's already a taosd service, and then kill it
            for proc in psutil.process_iter():
                if proc.name() == 'taosd':
                    print("Killing an existing TAOSD process in 2 seconds... press CTRL-C to interrupe")
                    time.sleep(2.0)
                    proc.kill()
                # print("Process: {}".format(proc.name()))
            
2589 2590 2591 2592
            # self.svcMgrThread = ServiceManagerThread()  # create the object
            for thread in self.svcMgrThreads:
                thread.start()            
                thread.procIpcBatch(trimToTarget=10, forceOutput=True)  # for printing 10 lines                         
2593

2594
    def stopTaosServices(self):
2595
        with self._lock:
2596 2597
            if not self.isActive():
                logger.warning("Cannot stop TAOS service(s), already not active")
2598
                return
2599

2600 2601 2602
            for thread in self.svcMgrThreads:
                thread.stop()
                
2603
    def run(self):
2604
        self.startTaosServices()
2605
        self._procIpcAll()  # pump/process all the messages, may encounter SIG + restart
2606 2607
        if  self.isActive():  # if sig handler hasn't destroyed it by now
            self.stopTaosServices()  # should have started already
2608

2609
    def restart(self):
2610 2611
        if not self.isStable():
            logger.warning("Cannot restart service/cluster, when not stable")
2612 2613
            return

2614 2615 2616
        # self._isRestarting = True
        if  self.isActive():
            self.stopTaosServices()
2617
        else:
2618
            logger.warning("Service not active when restart requested")
2619 2620

        self.startTaosService()
2621
        # self._isRestarting = False
2622

2623 2624
    # def isRunning(self):
    #     return self.svcMgrThread != None
2625

2626 2627
    # def isRestarting(self):
    #     return self._isRestarting
2628

2629
class ServiceManagerThread:
2630 2631 2632 2633 2634 2635
    """
    A class representing a dedicated thread which manages the "sub process"
    of the TDengine service, interacting with its STDOUT/ERR.

    It takes a TdeInstance parameter at creation time, or create a default    
    """
2636 2637
    MAX_QUEUE_SIZE = 10000

2638 2639
    def __init__(self, tInstNum = 0, tInst : TdeInstance = None):
        # Set the sub process
2640
        self._tdeSubProcess = None # type: TdeSubProcess
2641 2642 2643 2644 2645 2646 2647 2648 2649 2650

        # Arrange the TDengine instance
        self._tInstNum = tInstNum # instance serial number in cluster, ZERO based
        self._tInst    = tInst or TdeInstance() # Need an instance

        self._thread = None # The actual thread, # type: threading.Thread
        self._status = MainExec.STATUS_STOPPED # The status of the underlying service, actually.

    def __repr__(self):
        return "[SvcMgrThread: tInstNum={}]".format(self._tInstNum)
2651 2652 2653 2654

    def getStatus(self):
        return self._status

2655 2656 2657
    def isStarting(self):
        return self._status == MainExec.STATUS_STARTING

2658 2659 2660 2661 2662 2663 2664 2665 2666 2667
    def isRunning(self):
        # return self._thread and self._thread.is_alive()
        return self._status == MainExec.STATUS_RUNNING

    def isStopping(self):
        return self._status == MainExec.STATUS_STOPPING

    def isStopped(self):
        return self._status == MainExec.STATUS_STOPPED

2668 2669 2670
    def isStable(self):
        return self.isRunning() or self.isStopped()

2671 2672
    # Start the thread (with sub process), and wait for the sub service
    # to become fully operational
2673 2674
    def start(self):
        if self._thread:
2675
            raise RuntimeError("Unexpected _thread")
2676
        if self._tdeSubProcess:
2677 2678
            raise RuntimeError("TDengine sub process already created/running")

2679
        logger.info("Attempting to start TAOS service: {}".format(self))
2680

2681
        self._status = MainExec.STATUS_STARTING
2682
        self._tdeSubProcess = TdeSubProcess(self._tInst)
2683 2684 2685
        self._tdeSubProcess.start()

        self._ipcQueue = Queue()
2686
        self._thread = threading.Thread( # First thread captures server OUTPUT
2687
            target=self.svcOutputReader,
2688
            args=(self._tdeSubProcess.getStdOut(), self._ipcQueue))
2689
        self._thread.daemon = True  # thread dies with the program
2690 2691
        self._thread.start()

2692
        self._thread2 = threading.Thread( # 2nd thread captures server ERRORs
2693 2694 2695 2696 2697
            target=self.svcErrorReader,
            args=(self._tdeSubProcess.getStdErr(), self._ipcQueue))
        self._thread2.daemon = True  # thread dies with the program
        self._thread2.start()

2698
        # wait for service to start
R
root 已提交
2699
        for i in range(0, 100):
2700 2701 2702
            time.sleep(1.0)
            # self.procIpcBatch() # don't pump message during start up
            print("_zz_", end="", flush=True)
2703
            if self._status == MainExec.STATUS_RUNNING:
2704
                logger.info("[] TDengine service READY to process requests")
2705
                logger.info("[] TAOS service started: {}".format(self))
2706
                return  # now we've started
2707
        # TODO: handle failure-to-start  better?
R
root 已提交
2708
        self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output
2709
        raise RuntimeError("TDengine service did not start successfully: {}".format(self))
2710 2711 2712 2713 2714 2715

    def stop(self):
        # can be called from both main thread or signal handler
        print("Terminating TDengine service running as the sub process...")
        if self.isStopped():
            print("Service already stopped")
2716
            return
2717 2718 2719
        if self.isStopping():
            print("Service is already being stopped")
            return
2720 2721 2722 2723
        # Linux will send Control-C generated SIGINT to the TDengine process
        # already, ref:
        # https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
        if not self._tdeSubProcess:
2724
            raise RuntimeError("sub process object missing")
2725

2726
        self._status = MainExec.STATUS_STOPPING
2727 2728
        retCode = self._tdeSubProcess.stop()
        print("Attempted to stop sub process, got return code: {}".format(retCode))
2729 2730
        if (retCode==-11): # SGV
            logger.error("[[--ERROR--]]: TDengine service SEGV fault (check core file!)")
2731

2732
        if self._tdeSubProcess.isRunning():  # still running
2733 2734
            print("FAILED to stop sub process, it is still running... pid = {}".format(
                    self._tdeSubProcess.getPid()))
2735
        else:
2736 2737 2738
            self._tdeSubProcess = None  # not running any more
            self.join()  # stop the thread, change the status, etc.

2739 2740 2741 2742 2743 2744 2745 2746 2747
        # Check if it's really stopped
        outputLines = 20 # for last output
        if  self.isStopped():
            self.procIpcBatch(outputLines)  # one last time
            print("End of TDengine Service Output: {}".format(self))
            print("----- TDengine Service (managed by SMT) is now terminated -----\n")
        else:
            print("WARNING: SMT did not terminate as expected: {}".format(self))

2748 2749 2750
    def join(self):
        # TODO: sanity check
        if not self.isStopping():
2751 2752 2753
            raise RuntimeError(
                "Unexpected status when ending svc mgr thread: {}".format(
                    self._status))
2754

2755
        if self._thread:
2756
            self._thread.join()
2757
            self._thread = None
2758
            self._status = MainExec.STATUS_STOPPED
2759 2760 2761
            # STD ERR thread
            self._thread2.join()
            self._thread2 = None
S
Shuduo Sang 已提交
2762
        else:
2763
            print("Joining empty thread, doing nothing")
2764 2765 2766

    def _trimQueue(self, targetSize):
        if targetSize <= 0:
2767
            return  # do nothing
2768
        q = self._ipcQueue
2769
        if (q.qsize() <= targetSize):  # no need to trim
2770 2771 2772 2773
            return

        logger.debug("Triming IPC queue to target size: {}".format(targetSize))
        itemsToTrim = q.qsize() - targetSize
2774
        for i in range(0, itemsToTrim):
2775 2776 2777
            try:
                q.get_nowait()
            except Empty:
2778 2779
                break  # break out of for loop, no more trimming

2780
    TD_READY_MSG = "TDengine is initialized successfully"
S
Shuduo Sang 已提交
2781

2782 2783
    def procIpcBatch(self, trimToTarget=0, forceOutput=False):
        self._trimQueue(trimToTarget)  # trim if necessary
S
Shuduo Sang 已提交
2784 2785
        # Process all the output generated by the underlying sub process,
        # managed by IO thread
2786
        print("<", end="", flush=True)
S
Shuduo Sang 已提交
2787 2788
        while True:
            try:
2789
                line = self._ipcQueue.get_nowait()  # getting output at fast speed
2790
                self._printProgress("_o")
2791 2792 2793
            except Empty:
                # time.sleep(2.3) # wait only if there's no output
                # no more output
2794
                print(".>", end="", flush=True)
S
Shuduo Sang 已提交
2795
                return  # we are done with THIS BATCH
2796
            else:  # got line, printing out
2797 2798 2799 2800 2801 2802 2803
                if forceOutput:
                    logger.info(line)
                else:
                    logger.debug(line)
        print(">", end="", flush=True)

    _ProgressBars = ["--", "//", "||", "\\\\"]
2804

2805 2806
    def _printProgress(self, msg):  # TODO: assuming 2 chars
        print(msg, end="", flush=True)
2807 2808 2809
        pBar = self._ProgressBars[Dice.throw(4)]
        print(pBar, end="", flush=True)
        print('\b\b\b\b', end="", flush=True)
2810

2811 2812 2813
    def svcOutputReader(self, out: IO, queue):
        # Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
        # print("This is the svcOutput Reader...")
2814
        # for line in out :
2815 2816 2817
        for line in iter(out.readline, b''):
            # print("Finished reading a line: {}".format(line))
            # print("Adding item to queue...")
2818 2819 2820 2821 2822
            try:
                line = line.decode("utf-8").rstrip()
            except UnicodeError:
                print("\nNon-UTF8 server output: {}\n".format(line))

2823 2824
            # This might block, and then causing "out" buffer to block
            queue.put(line)
2825 2826
            self._printProgress("_i")

2827 2828
            if self._status == MainExec.STATUS_STARTING:  # we are starting, let's see if we have started
                if line.find(self.TD_READY_MSG) != -1:  # found
S
Steven Li 已提交
2829
                    logger.info("Waiting for the service to become FULLY READY")
2830
                    time.sleep(1.0) # wait for the server to truly start. TODO: remove this
2831
                    logger.info("Service instance #{} is now FULLY READY".format(self._tInstNum))   
S
Steven Li 已提交
2832
                    self._status = MainExec.STATUS_RUNNING                 
2833 2834

            # Trim the queue if necessary: TODO: try this 1 out of 10 times
2835
            self._trimQueue(self.MAX_QUEUE_SIZE * 9 // 10)  # trim to 90% size
2836

2837 2838 2839
            if self.isStopping():  # TODO: use thread status instead
                # WAITING for stopping sub process to finish its outptu
                print("_w", end="", flush=True)
2840 2841

            # queue.put(line)
2842 2843
        # meaning sub process must have died
        print("\nNo more output from IO thread managing TDengine service")
2844 2845
        out.close()

2846 2847
    def svcErrorReader(self, err: IO, queue):
        for line in iter(err.readline, b''):
2848
            print("\nTDengine Service (taosd) ERROR (from stderr): {}".format(line))
2849

2850 2851

class TdeSubProcess:
2852 2853 2854 2855 2856 2857 2858 2859 2860
    """
    A class to to represent the actual sub process that is the run-time
    of a TDengine instance. 

    It takes a TdeInstance object as its parameter, with the rationale being
    "a sub process runs an instance".
    """

    def __init__(self, tInst : TdeInstance):
2861
        self.subProcess = None
2862 2863 2864
        if tInst is None:
            raise CrashGenError("Empty instance not allowed in TdeSubProcess")
        self._tInst = tInst # Default create at ServiceManagerThread
2865 2866 2867

    def getStdOut(self):
        return self.subProcess.stdout
2868

2869 2870 2871
    def getStdErr(self):
        return self.subProcess.stderr

2872
    def isRunning(self):
2873
        return self.subProcess is not None
2874

2875 2876 2877
    def getPid(self):
        return self.subProcess.pid

2878 2879 2880 2881 2882 2883 2884
    # Repalced by TdeInstance class
    # def getBuildPath(self):
    #     selfPath = os.path.dirname(os.path.realpath(__file__))
    #     if ("community" in selfPath):
    #         projPath = selfPath[:selfPath.find("communit")]
    #     else:
    #         projPath = selfPath[:selfPath.find("tests")]
S
Shuduo Sang 已提交
2885

2886 2887 2888 2889 2890 2891 2892
    #     for root, dirs, files in os.walk(projPath):
    #         if ("taosd" in files):
    #             rootRealPath = os.path.dirname(os.path.realpath(root))
    #             if ("packaging" not in rootRealPath):
    #                 buildPath = root[:len(root) - len("/build/bin")]
    #                 break
    #     return buildPath
2893

2894
    def start(self):
2895
        ON_POSIX = 'posix' in sys.builtin_module_names
S
Shuduo Sang 已提交
2896

2897
        # Sanity check
S
Shuduo Sang 已提交
2898
        if self.subProcess:  # already there
2899 2900
            raise RuntimeError("Corrupt process state")

2901 2902 2903 2904 2905 2906 2907
        # global gContainer
        # tInst = gContainer.defTdeInstance = TdeInstance('test3') # creae the instance
        self._tInst.generateCfgFile() # service side generates config file, client does not

        self._tInst.rotateLogs()

        print("Starting TDengine instance: {}".format(self._tInst))
2908
        self.subProcess = subprocess.Popen(
2909 2910
            self._tInst.getServiceCommand(),
            shell=False,
2911
            # svcCmdSingle, shell=True, # capture core dump?
S
Shuduo Sang 已提交
2912
            stdout=subprocess.PIPE,
2913
            stderr=subprocess.PIPE,
2914
            # bufsize=1, # not supported in binary mode
S
Steven Li 已提交
2915 2916
            close_fds=ON_POSIX
            )  # had text=True, which interferred with reading EOF
2917

2918 2919 2920
    def stop(self):
        if not self.subProcess:
            print("Sub process already stopped")
2921
            return -1
2922

2923
        retCode = self.subProcess.poll() # contains real sub process return code
S
Shuduo Sang 已提交
2924
        if retCode:  # valid return code, process ended
2925
            self.subProcess = None
S
Shuduo Sang 已提交
2926 2927
        else:  # process still alive, let's interrupt it
            print(
2928
                "Sub process is running, sending SIG_INT and waiting for it to terminate...")
S
Shuduo Sang 已提交
2929 2930 2931 2932
            # sub process should end, then IPC queue should end, causing IO
            # thread to end
            self.subProcess.send_signal(signal.SIGINT)
            try:
2933
                self.subProcess.wait(10)
2934
                retCode = self.subProcess.returncode
2935 2936
            except subprocess.TimeoutExpired as err:
                print("Time out waiting for TDengine service process to exit")
2937
                retCode = -3
2938
            else:
2939
                print("TDengine service process terminated successfully from SIG_INT")
2940
                retCode = -4
2941
                self.subProcess = None
2942
        return retCode
2943

2944 2945 2946 2947 2948 2949 2950 2951 2952 2953 2954 2955 2956 2957 2958 2959 2960 2961 2962 2963 2964 2965 2966 2967 2968 2969 2970 2971
class ThreadStacks: # stack info for all threads
    def __init__(self):
        self._allStacks = {}
        allFrames = sys._current_frames()
        for th in threading.enumerate():                        
            stack = traceback.extract_stack(allFrames[th.ident])     
            self._allStacks[th.native_id] = stack

    def print(self, filteredEndName = None, filterInternal = False):
        for thNid, stack in self._allStacks.items(): # for each thread            
            lastFrame = stack[-1]
            if filteredEndName: # we need to filter out stacks that match this name                
                if lastFrame.name == filteredEndName : # end did not match
                    continue
            if filterInternal:
                if lastFrame.name in ['wait', 'invoke_excepthook', 
                    '_wait', # The Barrier exception
                    'svcOutputReader', # the svcMgr thread
                    '__init__']: # the thread that extracted the stack
                    continue # ignore
            # Now print
            print("\n<----- Thread Info for ID: {}".format(thNid))
            for frame in stack:
                # print(frame)
                print("File {filename}, line {lineno}, in {name}".format(
                    filename=frame.filename, lineno=frame.lineno, name=frame.name))
                print("    {}".format(frame.line))
            print("-----> End of Thread Info\n")
S
Shuduo Sang 已提交
2972

2973 2974 2975
class ClientManager:
    def __init__(self):
        print("Starting service manager")
2976 2977
        # signal.signal(signal.SIGTERM, self.sigIntHandler)
        # signal.signal(signal.SIGINT, self.sigIntHandler)
2978

2979
        self._status = MainExec.STATUS_RUNNING
2980 2981
        self.tc = None

2982 2983
        self.inSigHandler = False

2984
    def sigIntHandler(self, signalNumber, frame):
2985
        if self._status != MainExec.STATUS_RUNNING:
2986 2987 2988
            print("Repeated SIGINT received, forced exit...")
            # return  # do nothing if it's already not running
            sys.exit(-1)
2989
        self._status = MainExec.STATUS_STOPPING  # immediately set our status
2990

2991
        print("ClientManager: Terminating program...")
2992 2993
        self.tc.requestToStop()

2994 2995 2996 2997 2998 2999 3000 3001 3002 3003 3004 3005 3006 3007 3008 3009 3010 3011 3012 3013 3014 3015 3016 3017 3018 3019 3020 3021 3022 3023 3024 3025 3026 3027 3028 3029 3030 3031 3032 3033 3034
    def _doMenu(self):
        choice = ""
        while True:
            print("\nInterrupting Client Program, Choose an Action: ")
            print("1: Resume")
            print("2: Terminate")
            print("3: Show Threads")
            # Remember to update the if range below
            # print("Enter Choice: ", end="", flush=True)
            while choice == "":
                choice = input("Enter Choice: ")
                if choice != "":
                    break  # done with reading repeated input
            if choice in ["1", "2", "3"]:
                break  # we are done with whole method
            print("Invalid choice, please try again.")
            choice = ""  # reset
        return choice

    def sigUsrHandler(self, signalNumber, frame):
        print("Interrupting main thread execution upon SIGUSR1")
        if self.inSigHandler:  # already
            print("Ignoring repeated SIG_USR1...")
            return  # do nothing if it's already not running
        self.inSigHandler = True

        choice = self._doMenu()
        if choice == "1":
            print("Resuming execution...")
            time.sleep(1.0)
        elif choice == "2":
            print("Not implemented yet")
            time.sleep(1.0)
        elif choice == "3":
            ts = ThreadStacks()
            ts.print()
        else:
            raise RuntimeError("Invalid menu choice: {}".format(choice))

        self.inSigHandler = False

3035 3036 3037 3038 3039 3040 3041 3042 3043 3044 3045 3046 3047 3048 3049 3050 3051 3052 3053 3054 3055 3056 3057 3058 3059 3060 3061 3062 3063
    # TODO: need to revise how we verify data durability
    # def _printLastNumbers(self):  # to verify data durability
    #     dbManager = DbManager()
    #     dbc = dbManager.getDbConn()
    #     if dbc.query("show databases") <= 1:  # no database (we have a default called "log")
    #         return
    #     dbc.execute("use db")
    #     if dbc.query("show tables") == 0:  # no tables
    #         return

    #     sTbName = dbManager.getFixedSuperTableName()

    #     # get all regular tables
    #     # TODO: analyze result set later
    #     dbc.query("select TBNAME from db.{}".format(sTbName))
    #     rTables = dbc.getQueryResult()

    #     bList = TaskExecutor.BoundedList()
    #     for rTbName in rTables:  # regular tables
    #         dbc.query("select speed from db.{}".format(rTbName[0]))
    #         numbers = dbc.getQueryResult()
    #         for row in numbers:
    #             # print("<{}>".format(n), end="", flush=True)
    #             bList.add(row[0])

    #     print("Top numbers in DB right now: {}".format(bList))
    #     print("TDengine client execution is about to start in 2 seconds...")
    #     time.sleep(2.0)
    #     dbManager = None  # release?
3064

3065
    def run(self, svcMgr):    
3066
        # self._printLastNumbers()
3067
        global gConfig
3068

3069 3070 3071 3072
        # Prepare Tde Instance
        global gContainer
        tInst = gContainer.defTdeInstance = TdeInstance() # "subdir to hold the instance"

S
Shuduo Sang 已提交
3073
        dbManager = DbManager()  # Regular function
3074
        thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
3075
        self.tc = ThreadCoordinator(thPool, dbManager)
3076
        
3077
        print("Starting client instance to: {}".format(tInst))
3078
        self.tc.run()
S
Steven Li 已提交
3079 3080
        # print("exec stats: {}".format(self.tc.getExecStats()))
        # print("TC failed = {}".format(self.tc.isFailed()))
3081
        if svcMgr: # gConfig.auto_start_service:
3082
            svcMgr.stopTaosService()
3083
            svcMgr = None
3084 3085
        # Print exec status, etc., AFTER showing messages from the server
        self.conclude()
S
Steven Li 已提交
3086
        # print("TC failed (2) = {}".format(self.tc.isFailed()))
S
Shuduo Sang 已提交
3087
        # Linux return code: ref https://shapeshed.com/unix-exit-codes/
3088 3089 3090 3091 3092 3093 3094 3095 3096 3097 3098 3099 3100 3101 3102 3103 3104 3105 3106
        ret = 1 if self.tc.isFailed() else 0
        self.tc.cleanup()

        # Release global variables
        gConfig = None
        gSvcMgr = None
        logger = None

        # Release variables here
        self.tc = None
        thPool = None
        dbManager = None

        gc.collect() # force garbage collection
        # h = hpy()
        # print("\n----- Final Python Heap -----\n")        
        # print(h.heap())

        return ret
3107 3108

    def conclude(self):
3109
        # self.tc.getDbManager().cleanUp() # clean up first, so we can show ZERO db connections
3110
        self.tc.printStats()
3111

3112
class MainExec:
3113 3114 3115
    STATUS_STARTING = 1
    STATUS_RUNNING = 2
    STATUS_STOPPING = 3
3116
    STATUS_STOPPED = 4
3117

3118 3119 3120
    def __init__(self):        
        self._clientMgr = None
        self._svcMgr = None
3121

3122 3123 3124
        signal.signal(signal.SIGTERM, self.sigIntHandler)
        signal.signal(signal.SIGINT,  self.sigIntHandler)
        signal.signal(signal.SIGUSR1, self.sigUsrHandler)  # different handler!
3125

3126 3127 3128 3129 3130 3131 3132 3133 3134 3135 3136 3137 3138
    def sigUsrHandler(self, signalNumber, frame):
        if self._clientMgr:
            self._clientMgr.sigUsrHandler(signalNumber, frame)
        elif self._svcMgr: # Only if no client mgr, we are running alone
            self._svcMgr.sigUsrHandler(signalNumber, frame)
        
    def sigIntHandler(self, signalNumber, frame):
        if self._svcMgr:
            self._svcMgr.sigIntHandler(signalNumber, frame)
        if self._clientMgr:
            self._clientMgr.sigIntHandler(signalNumber, frame)

    def runClient(self):
3139
        global gSvcMgr
3140
        if gConfig.auto_start_service:
3141
            self._svcMgr = ServiceManager()
3142
            gSvcMgr = self._svcMgr # hack alert
3143 3144 3145
            self._svcMgr.startTaosService() # we start, don't run
        
        self._clientMgr = ClientManager()
3146 3147 3148 3149
        ret = None
        try: 
            ret = self._clientMgr.run(self._svcMgr) # stop TAOS service inside
        except requests.exceptions.ConnectionError as err:
S
Steven Li 已提交
3150
            logger.warning("Failed to open REST connection to DB: {}".format(err.getMessage()))
3151 3152
            # don't raise
        return ret
3153 3154

    def runService(self):
3155
        global gSvcMgr
3156
        self._svcMgr = ServiceManager()
3157 3158
        gSvcMgr = self._svcMgr # save it in a global variable TODO: hack alert

3159
        self._svcMgr.run() # run to some end state
3160 3161
        self._svcMgr = None 
        gSvcMgr = None        
3162

S
Shuduo Sang 已提交
3163

S
Steven Li 已提交
3164

3165
def main():
S
Shuduo Sang 已提交
3166 3167
    # Super cool Python argument library:
    # https://docs.python.org/3/library/argparse.html
3168 3169 3170 3171 3172 3173 3174 3175
    parser = argparse.ArgumentParser(
        formatter_class=argparse.RawDescriptionHelpFormatter,
        description=textwrap.dedent('''\
            TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
            ---------------------------------------------------------------------
            1. You build TDengine in the top level ./build directory, as described in offical docs
            2. You run the server there before this script: ./build/bin/taosd -c test/cfg

3176
            '''))                      
3177

S
Shuduo Sang 已提交
3178 3179 3180 3181 3182
    parser.add_argument(
        '-a',
        '--auto-start-service',
        action='store_true',
        help='Automatically start/stop the TDengine service (default: false)')
3183 3184 3185 3186 3187 3188 3189
    parser.add_argument(
        '-b',
        '--max-dbs',
        action='store',
        default=0,
        type=int,
        help='Maximum number of DBs to keep, set to disable dropping DB. (default: 0)')
S
Shuduo Sang 已提交
3190 3191 3192 3193 3194 3195 3196 3197 3198 3199 3200 3201 3202 3203 3204 3205 3206
    parser.add_argument(
        '-c',
        '--connector-type',
        action='store',
        default='native',
        type=str,
        help='Connector type to use: native, rest, or mixed (default: 10)')
    parser.add_argument(
        '-d',
        '--debug',
        action='store_true',
        help='Turn on DEBUG mode for more logging (default: false)')
    parser.add_argument(
        '-e',
        '--run-tdengine',
        action='store_true',
        help='Run TDengine service in foreground (default: false)')
3207 3208 3209 3210 3211 3212 3213
    parser.add_argument(
        '-i',
        '--max-replicas',
        action='store',
        default=1,
        type=int,
        help='Maximum number of replicas to use, when testing against clusters. (default: 1)')
S
Shuduo Sang 已提交
3214 3215 3216 3217 3218 3219 3220 3221 3222 3223 3224 3225 3226 3227 3228 3229 3230 3231 3232 3233 3234 3235 3236 3237 3238 3239 3240 3241 3242
    parser.add_argument(
        '-l',
        '--larger-data',
        action='store_true',
        help='Write larger amount of data during write operations (default: false)')
    parser.add_argument(
        '-p',
        '--per-thread-db-connection',
        action='store_true',
        help='Use a single shared db connection (default: false)')
    parser.add_argument(
        '-r',
        '--record-ops',
        action='store_true',
        help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')
    parser.add_argument(
        '-s',
        '--max-steps',
        action='store',
        default=1000,
        type=int,
        help='Maximum number of steps to run (default: 100)')
    parser.add_argument(
        '-t',
        '--num-threads',
        action='store',
        default=5,
        type=int,
        help='Number of threads to run (default: 10)')
3243
    parser.add_argument(
3244 3245 3246 3247 3248
        '-v',
        '--verify-data',
        action='store_true',
        help='Verify data written in a number of places by reading back (default: false)')
    parser.add_argument(
3249 3250 3251 3252
        '-x',
        '--continue-on-exception',
        action='store_true',
        help='Continue execution after encountering unexpected/disallowed errors/exceptions (default: false)')
3253

3254
    global gConfig
3255
    gConfig = parser.parse_args()
3256

3257
    # Logging Stuff
3258
    global logger
S
Shuduo Sang 已提交
3259 3260
    _logger = logging.getLogger('CrashGen')  # real logger
    _logger.addFilter(LoggingFilter())
3261 3262 3263
    ch = logging.StreamHandler()
    _logger.addHandler(ch)

S
Shuduo Sang 已提交
3264 3265
    # Logging adapter, to be used as a logger
    logger = MyLoggingAdapter(_logger, [])
3266

S
Shuduo Sang 已提交
3267 3268
    if (gConfig.debug):
        logger.setLevel(logging.DEBUG)  # default seems to be INFO
S
Steven Li 已提交
3269 3270
    else:
        logger.setLevel(logging.INFO)
S
Shuduo Sang 已提交
3271

3272 3273
    Dice.seed(0)  # initial seeding of dice

3274
    # Run server or client
3275
    mExec = MainExec()
S
Shuduo Sang 已提交
3276
    if gConfig.run_tdengine:  # run server
3277
        mExec.runService()
S
Shuduo Sang 已提交
3278
    else:
3279
        return mExec.runClient()
3280

3281 3282 3283 3284 3285 3286 3287 3288 3289 3290 3291 3292 3293 3294 3295 3296 3297 3298 3299 3300 3301
class Container():
    _propertyList = {'defTdeInstance'}

    def __init__(self):
        self._cargo = {} # No cargo at the beginning

    def _verifyValidProperty(self, name):
        if not name in self._propertyList:
            raise CrashGenError("Invalid container property: {}".format(name))

    # Called for an attribute, when other mechanisms fail (compare to  __getattribute__)
    def __getattr__(self, name):
        self._verifyValidProperty(name)
        return self._cargo[name] # just a simple lookup

    def __setattr__(self, name, value):
        if name == '_cargo' : # reserved vars
            super().__setattr__(name, value)
            return
        self._verifyValidProperty(name)
        self._cargo[name] = value
S
Shuduo Sang 已提交
3302

3303
if __name__ == "__main__":
3304 3305
    gContainer = Container() # micky-mouse DI

S
Steven Li 已提交
3306 3307
    exitCode = main()
    # print("Exiting with code: {}".format(exitCode))
3308
    sys.exit(exitCode)