crash_gen.py 74.1 KB
Newer Older
1
#!/usr/bin/python3.7
S
Steven Li 已提交
2 3 4 5 6 7 8 9 10 11 12 13
###################################################################
#           Copyright (c) 2016 by TAOS Technologies, Inc.
#                     All rights reserved.
#
#  This file is proprietary and confidential to TAOS Technologies.
#  No part of this file may be reproduced, stored, transmitted,
#  disclosed or used in any form or by any means other than as
#  expressly provided by the written permission from Jianhui Tao
#
###################################################################

# -*- coding: utf-8 -*-
14 15
from __future__ import annotations  # For type hinting before definition, ref: https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel    

S
Steven Li 已提交
16
import sys
17
import os
18 19
import io
import signal
20
import traceback
21 22 23 24
# Require Python 3
if sys.version_info[0] < 3:
    raise Exception("Must be using Python 3")

S
Steven Li 已提交
25
import getopt
26
import argparse
27
import copy
S
Steven Li 已提交
28 29 30

import threading
import random
31
import time
S
Steven Li 已提交
32
import logging
33
import datetime
34
import textwrap
35 36
import requests
from requests.auth import HTTPBasicAuth
S
Steven Li 已提交
37

38
from typing import List
39
from typing import Dict
40
from typing import Set
41 42
from typing import IO
from queue import Queue, Empty
43

S
Steven Li 已提交
44 45 46 47 48
from util.log import *
from util.dnodes import *
from util.cases import *
from util.sql import *

49
import crash_gen
S
Steven Li 已提交
50 51
import taos

52
# Global variables, tried to keep a small number. 
53 54 55 56

# Command-line/Environment Configurations, will set a bit later
# ConfigNameSpace = argparse.Namespace
gConfig = argparse.Namespace() # Dummy value, will be replaced later
57
logger = None
S
Steven Li 已提交
58

59 60
def runThread(wt: WorkerThread):    
    wt.run()
61

62 63 64 65 66 67 68 69
class CrashGenError(Exception):
    def __init__(self, msg=None, errno=None):
        self.msg = msg    
        self.errno = errno
    
    def __str__(self):
        return self.msg

S
Steven Li 已提交
70
class WorkerThread:
71
    def __init__(self, pool: ThreadPool, tid, 
72 73 74 75
            tc: ThreadCoordinator,
            # te: TaskExecutor,
            ): # note: main thread context!
        # self._curStep = -1 
76
        self._pool = pool
77
        self._tid = tid        
78
        self._tc = tc # type: ThreadCoordinator
S
Steven Li 已提交
79
        # self.threadIdent = threading.get_ident()
80 81
        self._thread = threading.Thread(target=runThread, args=(self,))
        self._stepGate = threading.Event()
S
Steven Li 已提交
82

83
        # Let us have a DB connection of our own
84
        if ( gConfig.per_thread_db_connection ): # type: ignore
85 86
            # print("connector_type = {}".format(gConfig.connector_type))
            self._dbConn = DbConn.createNative() if (gConfig.connector_type == 'native') else DbConn.createRest()
87

88 89
        self._dbInUse = False # if "use db" was executed already

90
    def logDebug(self, msg):
S
Steven Li 已提交
91
        logger.debug("    TRD[{}] {}".format(self._tid, msg))
92 93

    def logInfo(self, msg):
S
Steven Li 已提交
94
        logger.info("    TRD[{}] {}".format(self._tid, msg))
95

96 97 98 99 100 101 102 103
    def dbInUse(self):
        return self._dbInUse

    def useDb(self):
        if ( not self._dbInUse ):
            self.execSql("use db")
        self._dbInUse = True

104 105
    def getTaskExecutor(self):
        return self._tc.getTaskExecutor()     
106

S
Steven Li 已提交
107
    def start(self):
108
        self._thread.start()  # AFTER the thread is recorded
S
Steven Li 已提交
109

110
    def run(self): 
S
Steven Li 已提交
111
        # initialization after thread starts, in the thread context
112
        # self.isSleeping = False
113 114
        logger.info("Starting to run thread: {}".format(self._tid))

115
        if ( gConfig.per_thread_db_connection ): # type: ignore
116
            logger.debug("Worker thread openning database connection")
117
            self._dbConn.open()
S
Steven Li 已提交
118

119 120
        self._doTaskLoop()       
        
121
        # clean up
122
        if ( gConfig.per_thread_db_connection ): # type: ignore 
123
            self._dbConn.close()
124

125 126 127
    def _doTaskLoop(self) :
        # while self._curStep < self._pool.maxSteps:
        # tc = ThreadCoordinator(None)
128 129 130
        while True:  
            tc = self._tc # Thread Coordinator, the overall master            
            tc.crossStepBarrier()  # shared barrier first, INCLUDING the last one
S
Steven Li 已提交
131
            logger.debug("[TRD] Worker thread [{}] exited barrier...".format(self._tid))
132
            self.crossStepGate()   # then per-thread gate, after being tapped
S
Steven Li 已提交
133
            logger.debug("[TRD] Worker thread [{}] exited step gate...".format(self._tid))
134
            if not self._tc.isRunning():
S
Steven Li 已提交
135
                logger.debug("[TRD] Thread Coordinator not running any more, worker thread now stopping...")
136 137
                break

138
            # Fetch a task from the Thread Coordinator
S
Steven Li 已提交
139
            logger.debug("[TRD] Worker thread [{}] about to fetch task".format(self._tid))
140
            task = tc.fetchTask()
141 142

            # Execute such a task
S
Steven Li 已提交
143
            logger.debug("[TRD] Worker thread [{}] about to execute task: {}".format(self._tid, task.__class__.__name__))
144
            task.execute(self)
145
            tc.saveExecutedTask(task)
S
Steven Li 已提交
146
            logger.debug("[TRD] Worker thread [{}] finished executing task".format(self._tid))
147 148

            self._dbInUse = False # there may be changes between steps
149
  
S
Steven Li 已提交
150
    def verifyThreadSelf(self): # ensure we are called by this own thread
151
        if ( threading.get_ident() != self._thread.ident ): 
S
Steven Li 已提交
152 153 154 155 156 157 158
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadMain(self): # ensure we are called by the main thread
        if ( threading.get_ident() != threading.main_thread().ident ): 
            raise RuntimeError("Unexpectly called from other threads")

    def verifyThreadAlive(self):
159
        if ( not self._thread.is_alive() ):
S
Steven Li 已提交
160 161
            raise RuntimeError("Unexpected dead thread")

162
    # A gate is different from a barrier in that a thread needs to be "tapped"
S
Steven Li 已提交
163 164 165 166
    def crossStepGate(self):
        self.verifyThreadAlive()
        self.verifyThreadSelf() # only allowed by ourselves
        
167
        # Wait again at the "gate", waiting to be "tapped"
S
Steven Li 已提交
168
        logger.debug("[TRD] Worker thread {} about to cross the step gate".format(self._tid))
169 170
        self._stepGate.wait() 
        self._stepGate.clear()
S
Steven Li 已提交
171
        
172
        # self._curStep += 1  # off to a new step...
S
Steven Li 已提交
173 174 175 176

    def tapStepGate(self): # give it a tap, release the thread waiting there
        self.verifyThreadAlive()
        self.verifyThreadMain() # only allowed for main thread
177
 
S
Steven Li 已提交
178
        logger.debug("[TRD] Tapping worker thread {}".format(self._tid))
179 180
        self._stepGate.set() # wake up!        
        time.sleep(0) # let the released thread run a bit
181

182
    def execSql(self, sql): # TODO: expose DbConn directly
183 184 185
        if ( gConfig.per_thread_db_connection ):
            return self._dbConn.execute(sql)            
        else:
186
            return self._tc.getDbManager().getDbConn().execute(sql)
187

188 189 190 191 192 193 194 195 196 197 198 199
    def querySql(self, sql): # TODO: expose DbConn directly
        if ( gConfig.per_thread_db_connection ):
            return self._dbConn.query(sql)            
        else:
            return self._tc.getDbManager().getDbConn().query(sql)

    def getQueryResult(self):
        if ( gConfig.per_thread_db_connection ):
            return self._dbConn.getQueryResult()       
        else:
            return self._tc.getDbManager().getDbConn().getQueryResult()

200 201 202 203
    def getDbConn(self):
        if ( gConfig.per_thread_db_connection ):
            return self._dbConn     
        else:
204
            return self._tc.getDbManager().getDbConn()
205

206 207 208 209 210
    # def querySql(self, sql): # not "execute", since we are out side the DB context
    #     if ( gConfig.per_thread_db_connection ):
    #         return self._dbConn.query(sql)            
    #     else:
    #         return self._tc.getDbState().getDbConn().query(sql)
211

212
# The coordinator of all worker threads, mostly running in main thread
213
class ThreadCoordinator:
214
    def __init__(self, pool: ThreadPool, dbManager):
215 216
        self._curStep = -1 # first step is 0
        self._pool = pool
217
        # self._wd = wd
218
        self._te = None # prepare for every new step
219
        self._dbManager = dbManager
220 221
        self._executedTasks: List[Task] = [] # in a given step
        self._lock = threading.RLock() # sync access for a few things
S
Steven Li 已提交
222

223
        self._stepBarrier = threading.Barrier(self._pool.numThreads + 1) # one barrier for all threads
224
        self._execStats = ExecutionStats()
225
        self._runStatus = MainExec.STATUS_RUNNING
S
Steven Li 已提交
226

227 228 229
    def getTaskExecutor(self):
        return self._te

230 231
    def getDbManager(self) -> DbManager :
        return self._dbManager
232

233 234 235
    def crossStepBarrier(self):
        self._stepBarrier.wait()

236 237 238 239
    def requestToStop(self):
        self._runStatus = MainExec.STATUS_STOPPING
        self._execStats.registerFailure("User Interruption")

240 241
    def run(self):              
        self._pool.createAndStartThreads(self)
S
Steven Li 已提交
242 243

        # Coordinate all threads step by step
244 245
        self._curStep = -1 # not started yet
        maxSteps = gConfig.max_steps # type: ignore
246
        self._execStats.startExec() # start the stop watch
247 248 249 250 251 252 253
        transitionFailed = False
        hasAbortedTask = False
        while(self._curStep < maxSteps-1 and 
            (not transitionFailed) and 
            (self._runStatus==MainExec.STATUS_RUNNING) and
            (not hasAbortedTask)):  # maxStep==10, last curStep should be 9

S
Steven Li 已提交
254 255 256
            if not gConfig.debug: 
                print(".", end="", flush=True) # print this only if we are not in debug mode
            logger.debug("[TRD] Main thread going to sleep")
257

258
            # Now main thread (that's us) is ready to enter a step
259 260 261 262
            self.crossStepBarrier() # let other threads go past the pool barrier, but wait at the thread gate
            self._stepBarrier.reset() # Other worker threads should now be at the "gate"            

            # At this point, all threads should be pass the overall "barrier" and before the per-thread "gate"
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296
            # We use this period to do house keeping work, when all worker threads are QUIET.
            hasAbortedTask = False
            for task in self._executedTasks :
                if task.isAborted() :
                    print("Task aborted: {}".format(task))
                    hasAbortedTask = True
                    break

            if hasAbortedTask : # do transition only if tasks are error free
                self._execStats.registerFailure("Aborted Task Encountered")
            else: 
                try:
                    sm = self._dbManager.getStateMachine()
                    logger.debug("[STT] starting transitions")
                    sm.transition(self._executedTasks) # at end of step, transiton the DB state
                    logger.debug("[STT] transition ended")
                    # Due to limitation (or maybe not) of the Python library, we cannot share connections across threads
                    if sm.hasDatabase() :
                        for t in self._pool.threadList:
                            logger.debug("[DB] use db for all worker threads")
                            t.useDb()
                            # t.execSql("use db") # main thread executing "use db" on behalf of every worker thread
                except taos.error.ProgrammingError as err:
                    if ( err.msg == 'network unavailable' ): # broken DB connection
                        logger.info("DB connection broken, execution failed")
                        traceback.print_stack()
                        transitionFailed = True
                        self._te = None # Not running any more
                        self._execStats.registerFailure("Broken DB Connection")
                        # continue # don't do that, need to tap all threads at end, and maybe signal them to stop
                    else:
                        raise 
            # finally:
            #     pass
297
            
298
            self.resetExecutedTasks() # clear the tasks after we are done
299 300

            # Get ready for next step
S
Steven Li 已提交
301
            logger.debug("<-- Step {} finished".format(self._curStep))
302
            self._curStep += 1 # we are about to get into next step. TODO: race condition here!                
303
            logger.debug("\r\n\n--> Step {} starts with main thread waking up".format(self._curStep)) # Now not all threads had time to go to sleep
304

305
            # A new TE for the new step
306
            if not transitionFailed: # only if not failed
307
                self._te = TaskExecutor(self._curStep)
308

S
Steven Li 已提交
309
            logger.debug("[TRD] Main thread waking up at step {}, tapping worker threads".format(self._curStep)) # Now not all threads had time to go to sleep            
310
            self.tapAllThreads() # Worker threads will wake up at this point, and each execute it's own task
S
Steven Li 已提交
311

312
        logger.debug("Main thread ready to finish up...")
313
        if not transitionFailed: # only in regular situations
314 315 316 317 318 319 320
            self.crossStepBarrier() # Cross it one last time, after all threads finish
            self._stepBarrier.reset()
            logger.debug("Main thread in exclusive zone...")
            self._te = None # No more executor, time to end
            logger.debug("Main thread tapping all threads one last time...")
            self.tapAllThreads() # Let the threads run one last time

321 322
        logger.debug("Main thread joining all threads")
        self._pool.joinAll() # Get all threads to finish
323
        logger.info("\nAll worker threads finished")
324 325
        self._execStats.endExec()

326 327
    def printStats(self):
        self._execStats.printStats()
S
Steven Li 已提交
328 329 330

    def tapAllThreads(self): # in a deterministic manner
        wakeSeq = []
331
        for i in range(self._pool.numThreads): # generate a random sequence
S
Steven Li 已提交
332 333 334 335
            if Dice.throw(2) == 1 :
                wakeSeq.append(i)
            else:
                wakeSeq.insert(0, i)
336
        logger.debug("[TRD] Main thread waking up worker threads: {}".format(str(wakeSeq)))
337
        # TODO: set dice seed to a deterministic value
S
Steven Li 已提交
338
        for i in wakeSeq:
339
            self._pool.threadList[i].tapStepGate() # TODO: maybe a bit too deep?!
S
Steven Li 已提交
340 341
            time.sleep(0) # yield

342 343 344 345 346 347
    def isRunning(self):
        return self._te != None

    def fetchTask(self) -> Task :
        if ( not self.isRunning() ): # no task
            raise RuntimeError("Cannot fetch task when not running")
348 349
        # return self._wd.pickTask()
        # Alternatively, let's ask the DbState for the appropriate task
350 351 352 353 354 355 356
        # dbState = self.getDbState()
        # tasks = dbState.getTasksAtState() # TODO: create every time?
        # nTasks = len(tasks)
        # i = Dice.throw(nTasks)
        # logger.debug(" (dice:{}/{}) ".format(i, nTasks))
        # # return copy.copy(tasks[i]) # Needs a fresh copy, to save execution results, etc.
        # return tasks[i].clone() # TODO: still necessary?
357
        taskType = self.getDbManager().getStateMachine().pickTaskType() # pick a task type for current state
358
        return taskType(self.getDbManager(), self._execStats) # create a task from it
359 360 361

    def resetExecutedTasks(self):
        self._executedTasks = [] # should be under single thread
362 363 364 365

    def saveExecutedTask(self, task):
        with self._lock:
            self._executedTasks.append(task)
366 367

# We define a class to run a number of threads in locking steps.
368
class ThreadPool:
369
    def __init__(self, numThreads, maxSteps):
370 371 372 373
        self.numThreads = numThreads
        self.maxSteps = maxSteps
        # Internal class variables
        self.curStep = 0
374
        self.threadList = [] # type: List[WorkerThread]
375 376
        
    # starting to run all the threads, in locking steps
377
    def createAndStartThreads(self, tc: ThreadCoordinator):
378
        for tid in range(0, self.numThreads): # Create the threads
379
            workerThread = WorkerThread(self, tid, tc)            
380 381 382 383 384 385 386 387
            self.threadList.append(workerThread)
            workerThread.start() # start, but should block immediately before step 0

    def joinAll(self):
        for workerThread in self.threadList:
            logger.debug("Joining thread...")
            workerThread._thread.join()

388 389
# A queue of continguous POSITIVE integers, used by DbManager to generate continuous numbers
# for new table names
S
Steven Li 已提交
390 391
class LinearQueue():
    def __init__(self):
392
        self.firstIndex = 1  # 1st ever element
S
Steven Li 已提交
393
        self.lastIndex = 0
394
        self._lock = threading.RLock() # our functions may call each other
395
        self.inUse = set() # the indexes that are in use right now
S
Steven Li 已提交
396

397 398 399 400 401 402 403 404 405
    def toText(self):
        return "[{}..{}], in use: {}".format(self.firstIndex, self.lastIndex, self.inUse)

    # Push (add new element, largest) to the tail, and mark it in use
    def push(self): 
        with self._lock:
            # if ( self.isEmpty() ): 
            #     self.lastIndex = self.firstIndex 
            #     return self.firstIndex
406 407
            # Otherwise we have something
            self.lastIndex += 1
408 409
            self.allocate(self.lastIndex)
            # self.inUse.add(self.lastIndex) # mark it in use immediately
410
            return self.lastIndex
S
Steven Li 已提交
411 412

    def pop(self):
413
        with self._lock:
414
            if ( self.isEmpty() ): 
415 416 417
                # raise RuntimeError("Cannot pop an empty queue") 
                return False # TODO: None?
            
418
            index = self.firstIndex
419
            if ( index in self.inUse ):
420 421
                return False

422 423 424 425 426 427 428
            self.firstIndex += 1
            return index

    def isEmpty(self):
        return self.firstIndex > self.lastIndex

    def popIfNotEmpty(self):
429
        with self._lock:
430 431 432 433
            if (self.isEmpty()):
                return 0
            return self.pop()

S
Steven Li 已提交
434
    def allocate(self, i):
435
        with self._lock:
436
            # logger.debug("LQ allocating item {}".format(i))
437 438 439 440
            if ( i in self.inUse ):
                raise RuntimeError("Cannot re-use same index in queue: {}".format(i))
            self.inUse.add(i)

S
Steven Li 已提交
441
    def release(self, i):
442
        with self._lock:
443 444
            # logger.debug("LQ releasing item {}".format(i))
            self.inUse.remove(i) # KeyError possible, TODO: why?
445 446 447 448

    def size(self):
        return self.lastIndex + 1 - self.firstIndex

S
Steven Li 已提交
449
    def pickAndAllocate(self):
450 451 452
        if ( self.isEmpty() ):
            return None
        with self._lock:
453 454 455 456
            cnt = 0 # counting the interations
            while True:
                cnt += 1
                if ( cnt > self.size()*10 ): # 10x iteration already
457 458
                    # raise RuntimeError("Failed to allocate LinearQueue element")
                    return None
459 460
                ret = Dice.throwRange(self.firstIndex, self.lastIndex+1)
                if ( not ret in self.inUse ):
461 462 463 464
                    self.allocate(ret)
                    return ret

class DbConn:
465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485
    TYPE_NATIVE = "native-c"
    TYPE_REST = "rest-api"
    TYPE_INVALID = "invalid"

    @classmethod
    def create(cls, connType):
        if connType == cls.TYPE_NATIVE:
            return DbConnNative()
        elif connType == cls.TYPE_REST:
            return DbConnRest()
        else:
            raise RuntimeError("Unexpected connection type: {}".format(connType))

    @classmethod
    def createNative(cls):
        return cls.create(cls.TYPE_NATIVE)

    @classmethod
    def createRest(cls):
        return cls.create(cls.TYPE_REST)

486 487
    def __init__(self):
        self.isOpen = False
488 489 490
        self._type = self.TYPE_INVALID

    def open(self):
491 492 493
        if ( self.isOpen ):
            raise RuntimeError("Cannot re-open an existing DB connection")

494 495
        # below implemented by child classes
        self.openByType()
496

497
        logger.debug("[DB] data connection opened, type = {}".format(self._type))
498 499 500 501 502
        self.isOpen = True

    def resetDb(self): # reset the whole database, etc.
        if ( not self.isOpen ):
            raise RuntimeError("Cannot reset database until connection is open")
503 504
        # self._tdSql.prepare() # Recreate database, etc.

505
        self.execute('drop database if exists db')
506 507
        logger.debug("Resetting DB, dropped database")
        # self._cursor.execute('create database db')
508
        # self._cursor.execute('use db')
509 510
        # tdSql.execute('show databases')

511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621
    def queryScalar(self, sql) -> int :
        return self._queryAny(sql)

    def queryString(self, sql) -> str :
        return self._queryAny(sql)

    def _queryAny(self, sql) : # actual query result as an int
        if ( not self.isOpen ):
            raise RuntimeError("Cannot query database until connection is open")
        nRows = self.query(sql)
        if nRows != 1 :
            raise RuntimeError("Unexpected result for query: {}, rows = {}".format(sql, nRows))
        if self.getResultRows() != 1 or self.getResultCols() != 1:
            raise RuntimeError("Unexpected result set for query: {}".format(sql))
        return self.getQueryResult()[0][0]

    def execute(self, sql):
        raise RuntimeError("Unexpected execution, should be overriden")
    def openByType(self):
        raise RuntimeError("Unexpected execution, should be overriden")
    def getQueryResult(self):
        raise RuntimeError("Unexpected execution, should be overriden")
    def getResultRows(self):
        raise RuntimeError("Unexpected execution, should be overriden")
    def getResultCols(self):
        raise RuntimeError("Unexpected execution, should be overriden")

# Sample: curl -u root:taosdata -d "show databases" localhost:6020/rest/sql
class DbConnRest(DbConn):
    def __init__(self):
        super().__init__()
        self._type = self.TYPE_REST
        self._url = "http://localhost:6020/rest/sql" # fixed for now
        self._result = None

    def openByType(self): # Open connection
        pass # do nothing, always open
        
    def close(self):
        if ( not self.isOpen ):
            raise RuntimeError("Cannot clean up database until connection is open")
        # Do nothing for REST
        logger.debug("[DB] REST Database connection closed")
        self.isOpen = False

    def _doSql(self, sql):
        r = requests.post(self._url, 
            data = sql,
            auth = HTTPBasicAuth('root', 'taosdata'))
        rj = r.json()
        # Sanity check for the "Json Result"
        if (not 'status' in rj):
            raise RuntimeError("No status in REST response")

        if rj['status'] == 'error': # clearly reported error
            if (not 'code' in rj): # error without code
                raise RuntimeError("REST error return without code")            
            errno = rj['code'] # May need to massage this in the future
            # print("Raising programming error with REST return: {}".format(rj))
            raise taos.error.ProgrammingError(rj['desc'], errno) # todo: check existance of 'desc'

        if rj['status'] != 'succ': # better be this
            raise RuntimeError("Unexpected REST return status: {}".format(rj['status']))

        nRows = rj['rows'] if ('rows' in rj) else 0
        self._result = rj        
        return nRows

    def execute(self, sql): 
        if ( not self.isOpen ):
            raise RuntimeError("Cannot execute database commands until connection is open")
        logger.debug("[SQL-REST] Executing SQL: {}".format(sql))
        nRows = self._doSql(sql)
        logger.debug("[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
        return nRows

    def query(self, sql) :  # return rows affected
        return self.execute(sql)

    def getQueryResult(self):
        return self._result['data']

    def getResultRows(self):
        print(self._result)
        raise RuntimeError("TBD")
        # return self._tdSql.queryRows

    def getResultCols(self):
        print(self._result)
        raise RuntimeError("TBD")
    
class DbConnNative(DbConn):
    def __init__(self):
        super().__init__()
        self._type = self.TYPE_REST
        self._conn = None 
        self._cursor = None
        
    def openByType(self): # Open connection
        cfgPath = "../../build/test/cfg" 
        self._conn = taos.connect(host="127.0.0.1", config=cfgPath) # TODO: make configurable
        self._cursor = self._conn.cursor()

        # Get the connection/cursor ready
        self._cursor.execute('reset query cache')
        # self._cursor.execute('use db') # do this at the beginning of every step

        # Open connection
        self._tdSql = TDSql()
        self._tdSql.init(self._cursor)
        
622 623 624 625
    def close(self):
        if ( not self.isOpen ):
            raise RuntimeError("Cannot clean up database until connection is open")
        self._tdSql.close()
626
        logger.debug("[DB] Database connection closed")
627
        self.isOpen = False
S
Steven Li 已提交
628

629
    def execute(self, sql): 
630
        if ( not self.isOpen ):
631
            raise RuntimeError("Cannot execute database commands until connection is open")
632 633 634 635
        logger.debug("[SQL] Executing SQL: {}".format(sql))
        nRows = self._tdSql.execute(sql)
        logger.debug("[SQL] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
        return nRows
S
Steven Li 已提交
636

637
    def query(self, sql) :  # return rows affected
638 639
        if ( not self.isOpen ):
            raise RuntimeError("Cannot query database until connection is open")
640 641
        logger.debug("[SQL] Executing SQL: {}".format(sql))
        nRows = self._tdSql.query(sql)
642
        logger.debug("[SQL] Query Result, nRows = {}, SQL = {}".format(nRows, sql))
643
        return nRows
644
        # results are in: return self._tdSql.queryResult
645

646 647 648
    def getQueryResult(self):
        return self._tdSql.queryResult

649 650
    def getResultRows(self):
        return self._tdSql.queryRows
651

652 653
    def getResultCols(self):
        return self._tdSql.queryCols
654 655 656

    
class AnyState:
657
    STATE_INVALID    = -1
658 659 660 661
    STATE_EMPTY      = 0  # nothing there, no even a DB
    STATE_DB_ONLY    = 1  # we have a DB, but nothing else
    STATE_TABLE_ONLY = 2  # we have a table, but totally empty
    STATE_HAS_DATA   = 3  # we have some data in the table
662 663 664 665 666
    _stateNames = ["Invalid", "Empty", "DB_Only", "Table_Only", "Has_Data"]

    STATE_VAL_IDX = 0
    CAN_CREATE_DB = 1
    CAN_DROP_DB = 2
667 668
    CAN_CREATE_FIXED_SUPER_TABLE = 3
    CAN_DROP_FIXED_SUPER_TABLE = 4
669 670 671 672 673 674 675
    CAN_ADD_DATA = 5
    CAN_READ_DATA = 6

    def __init__(self):
        self._info = self.getInfo()

    def __str__(self):
S
Steven Li 已提交
676
        return self._stateNames[self._info[self.STATE_VAL_IDX] + 1] # -1 hack to accomodate the STATE_INVALID case
677 678 679 680

    def getInfo(self):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
681 682 683 684 685 686 687 688
    def equals(self, other):
        if isinstance(other, int):
            return self.getValIndex() == other
        elif isinstance(other, AnyState):
            return self.getValIndex() == other.getValIndex()
        else:
            raise RuntimeError("Unexpected comparison, type = {}".format(type(other)))

689 690 691
    def verifyTasksToState(self, tasks, newState):
        raise RuntimeError("Must be overriden by child classes")

S
Steven Li 已提交
692 693 694
    def getValIndex(self):
        return self._info[self.STATE_VAL_IDX]

695 696 697 698 699 700
    def getValue(self):
        return self._info[self.STATE_VAL_IDX]
    def canCreateDb(self):
        return self._info[self.CAN_CREATE_DB]
    def canDropDb(self):
        return self._info[self.CAN_DROP_DB]
701 702 703 704
    def canCreateFixedSuperTable(self):
        return self._info[self.CAN_CREATE_FIXED_SUPER_TABLE]
    def canDropFixedSuperTable(self):
        return self._info[self.CAN_DROP_FIXED_SUPER_TABLE]
705 706 707 708 709 710 711 712 713 714 715
    def canAddData(self):
        return self._info[self.CAN_ADD_DATA]
    def canReadData(self):
        return self._info[self.CAN_READ_DATA]

    def assertAtMostOneSuccess(self, tasks, cls):
        sCnt = 0
        for task in tasks :
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
S
Steven Li 已提交
716
                # task.logDebug("Task success found")
717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751
                sCnt += 1
                if ( sCnt >= 2 ):
                    raise RuntimeError("Unexpected more than 1 success with task: {}".format(cls))

    def assertIfExistThenSuccess(self, tasks, cls):
        sCnt = 0
        exists = False
        for task in tasks :
            if not isinstance(task, cls):
                continue
            exists = True # we have a valid instance
            if task.isSuccess():
                sCnt += 1
        if ( exists and sCnt <= 0 ):
            raise RuntimeError("Unexpected zero success for task: {}".format(cls))

    def assertNoTask(self, tasks, cls):
        for task in tasks :
            if isinstance(task, cls):
                raise CrashGenError("This task: {}, is not expected to be present, given the success/failure of others".format(cls.__name__))

    def assertNoSuccess(self, tasks, cls):
        for task in tasks :
            if isinstance(task, cls):
                if task.isSuccess():
                    raise RuntimeError("Unexpected successful task: {}".format(cls))

    def hasSuccess(self, tasks, cls):
        for task in tasks :
            if not isinstance(task, cls):
                continue
            if task.isSuccess():
                return True
        return False

S
Steven Li 已提交
752 753 754 755 756 757
    def hasTask(self, tasks, cls):
        for task in tasks :
            if isinstance(task, cls):
                return True
        return False

758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777
class StateInvalid(AnyState):
    def getInfo(self):
        return [
            self.STATE_INVALID,
            False, False, # can create/drop Db
            False, False, # can create/drop fixed table
            False, False, # can insert/read data with fixed table
        ]

    # def verifyTasksToState(self, tasks, newState):

class StateEmpty(AnyState):
    def getInfo(self):
        return [
            self.STATE_EMPTY,
            True, False, # can create/drop Db
            False, False, # can create/drop fixed table
            False, False, # can insert/read data with fixed table
        ]

S
Steven Li 已提交
778
    def verifyTasksToState(self, tasks, newState): 
779 780 781
        if ( self.hasSuccess(tasks, TaskCreateDb) ): # at EMPTY, if there's succes in creating DB
            if ( not self.hasTask(tasks, TaskDropDb) ) : # and no drop_db tasks
                self.assertAtMostOneSuccess(tasks, TaskCreateDb) # we must have at most one. TODO: compare numbers
782 783 784 785 786 787 788 789 790 791 792

class StateDbOnly(AnyState):
    def getInfo(self):
        return [
            self.STATE_DB_ONLY,
            False, True,
            True, False,
            False, False,
        ]

    def verifyTasksToState(self, tasks, newState):
793 794 795
        if ( not self.hasTask(tasks, TaskCreateDb) ):
            self.assertAtMostOneSuccess(tasks, TaskDropDb) # only if we don't create any more
        self.assertIfExistThenSuccess(tasks, TaskDropDb)
S
Steven Li 已提交
796
        # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not true in massively parrallel cases
797
        # Nothing to be said about adding data task
798
        # if ( self.hasSuccess(tasks, DropDbTask) ): # dropped the DB
799
            # self.assertHasTask(tasks, DropDbTask) # implied by hasSuccess
800
            # self.assertAtMostOneSuccess(tasks, DropDbTask)
801
            # self._state = self.STATE_EMPTY
802 803 804 805
        # if ( self.hasSuccess(tasks, TaskCreateSuperTable) ): # did not drop db, create table success
        #     # self.assertHasTask(tasks, CreateFixedTableTask) # tried to create table
        #     if ( not self.hasTask(tasks, TaskDropSuperTable) ): 
        #         self.assertAtMostOneSuccess(tasks, TaskCreateSuperTable) # at most 1 attempt is successful, if we don't drop anything
806
            # self.assertNoTask(tasks, DropDbTask) # should have have tried
807 808 809 810 811 812 813 814 815 816 817 818
            # if ( not self.hasSuccess(tasks, AddFixedDataTask) ): # just created table, no data yet
            #     # can't say there's add-data attempts, since they may all fail
            #     self._state = self.STATE_TABLE_ONLY
            # else:                    
            #     self._state = self.STATE_HAS_DATA
        # What about AddFixedData?
        # elif ( self.hasSuccess(tasks, AddFixedDataTask) ):
        #     self._state = self.STATE_HAS_DATA
        # else: # no success in dropping db tasks, no success in create fixed table? read data should also fail
        #     # raise RuntimeError("Unexpected no-success scenario")   # We might just landed all failure tasks, 
        #     self._state = self.STATE_DB_ONLY  # no change

819
class StateSuperTableOnly(AnyState):
820 821 822 823 824 825 826 827 828
    def getInfo(self):
        return [
            self.STATE_TABLE_ONLY,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
829
        if ( self.hasSuccess(tasks, TaskDropSuperTable) ): # we are able to drop the table
830 831 832
            #self.assertAtMostOneSuccess(tasks, TaskDropSuperTable)
            self.hasSuccess(tasks, TaskCreateSuperTable) # we must have had recreted it

833
            # self._state = self.STATE_DB_ONLY
S
Steven Li 已提交
834 835
        # elif ( self.hasSuccess(tasks, AddFixedDataTask) ): # no success dropping the table, but added data
        #     self.assertNoTask(tasks, DropFixedTableTask) # not true in massively parrallel cases
836
            # self._state = self.STATE_HAS_DATA
S
Steven Li 已提交
837 838 839
        # elif ( self.hasSuccess(tasks, ReadFixedDataTask) ): # no success in prev cases, but was able to read data
            # self.assertNoTask(tasks, DropFixedTableTask)
            # self.assertNoTask(tasks, AddFixedDataTask)
840
            # self._state = self.STATE_TABLE_ONLY # no change
S
Steven Li 已提交
841 842 843
        # else: # did not drop table, did not insert data, did not read successfully, that is impossible
        #     raise RuntimeError("Unexpected no-success scenarios")
        # TODO: need to revamp!!
844 845 846 847 848 849 850 851 852 853 854

class StateHasData(AnyState):
    def getInfo(self):
        return [
            self.STATE_HAS_DATA,
            False, True,
            False, True,
            True, True,
        ]

    def verifyTasksToState(self, tasks, newState):
S
Steven Li 已提交
855
        if ( newState.equals(AnyState.STATE_EMPTY) ):
856 857 858
            self.hasSuccess(tasks, TaskDropDb)
            if ( not self.hasTask(tasks, TaskCreateDb) ) : 
                self.assertAtMostOneSuccess(tasks, TaskDropDb) # TODO: dicy
S
Steven Li 已提交
859
        elif ( newState.equals(AnyState.STATE_DB_ONLY) ): # in DB only
860 861 862
            if ( not self.hasTask(tasks, TaskCreateDb)): # without a create_db task
                self.assertNoTask(tasks, TaskDropDb) # we must have drop_db task
            self.hasSuccess(tasks, TaskDropSuperTable)
863
            # self.assertAtMostOneSuccess(tasks, DropFixedSuperTableTask) # TODO: dicy
864 865 866 867
        # elif ( newState.equals(AnyState.STATE_TABLE_ONLY) ): # data deleted
            # self.assertNoTask(tasks, TaskDropDb)
            # self.assertNoTask(tasks, TaskDropSuperTable)
            # self.assertNoTask(tasks, TaskAddData)
S
Steven Li 已提交
868
            # self.hasSuccess(tasks, DeleteDataTasks)
869
        else: # should be STATE_HAS_DATA
870 871
            if (not self.hasTask(tasks, TaskCreateDb) ): # only if we didn't create one
                self.assertNoTask(tasks, TaskDropDb) # we shouldn't have dropped it
872 873
            if (not self.hasTask(tasks, TaskCreateSuperTable)) :  # if we didn't create the table
                self.assertNoTask(tasks, TaskDropSuperTable) # we should not have a task that drops it            
874
            # self.assertIfExistThenSuccess(tasks, ReadFixedDataTask)
S
Steven Li 已提交
875

876
class StateMechine:
877 878 879 880 881 882 883 884
    def __init__(self, dbConn):
        self._dbConn = dbConn
        self._curState = self._findCurrentState() # starting state
        self._stateWeights = [1,3,5,15] # transitition target probabilities, indexed with value of STATE_EMPTY, STATE_DB_ONLY, etc.
        
    def getCurrentState(self):
        return self._curState

885 886 887
    def hasDatabase(self):
        return self._curState.canDropDb()  # ha, can drop DB means it has one

888 889
    # May be slow, use cautionsly...
    def getTaskTypes(self): # those that can run (directly/indirectly) from the current state
890 891 892 893 894 895
        def typesToStrings(types):
            ss = []
            for t in types:
                ss.append(t.__name__)
            return ss

896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913
        allTaskClasses = StateTransitionTask.__subclasses__() # all state transition tasks
        firstTaskTypes = []
        for tc in allTaskClasses:
            # t = tc(self) # create task object            
            if tc.canBeginFrom(self._curState):
                firstTaskTypes.append(tc)
        # now we have all the tasks that can begin directly from the current state, let's figure out the INDIRECT ones
        taskTypes = firstTaskTypes.copy() # have to have these
        for task1 in firstTaskTypes: # each task type gathered so far
            endState = task1.getEndState() # figure the end state
            if endState == None: # does not change end state
                continue # no use, do nothing
            for tc in allTaskClasses: # what task can further begin from there?
                if tc.canBeginFrom(endState) and (tc not in firstTaskTypes):
                    taskTypes.append(tc) # gather it

        if len(taskTypes) <= 0:
            raise RuntimeError("No suitable task types found for state: {}".format(self._curState))   
914
        logger.debug("[OPS] Tasks found for state {}: {}".format(self._curState, typesToStrings(taskTypes)))     
915 916 917 918 919 920 921 922 923
        return taskTypes

    def _findCurrentState(self):
        dbc = self._dbConn
        ts = time.time() # we use this to debug how fast/slow it is to do the various queries to find the current DB state
        if dbc.query("show databases") == 0 : # no database?!
            # logger.debug("Found EMPTY state")
            logger.debug("[STT] empty database found, between {} and {}".format(ts, time.time()))
            return StateEmpty()
924
        dbc.execute("use db") # did not do this when openning connection, and this is NOT the worker thread, which does this on their own
925 926 927 928 929 930 931 932 933 934 935 936 937 938 939
        if dbc.query("show tables") == 0 : # no tables
            # logger.debug("Found DB ONLY state")
            logger.debug("[STT] DB_ONLY found, between {} and {}".format(ts, time.time()))
            return StateDbOnly()
        if dbc.query("SELECT * FROM db.{}".format(DbManager.getFixedSuperTableName()) ) == 0 : # no regular tables
            # logger.debug("Found TABLE_ONLY state")
            logger.debug("[STT] SUPER_TABLE_ONLY found, between {} and {}".format(ts, time.time()))
            return StateSuperTableOnly()
        else: # has actual tables
            # logger.debug("Found HAS_DATA state")
            logger.debug("[STT] HAS_DATA found, between {} and {}".format(ts, time.time()))
            return StateHasData()

    def transition(self, tasks):
        if ( len(tasks) == 0 ): # before 1st step, or otherwise empty
940
            logger.debug("[STT] Starting State: {}".format(self._curState))
941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991
            return # do nothing

        self._dbConn.execute("show dnodes") # this should show up in the server log, separating steps

        # Generic Checks, first based on the start state
        if self._curState.canCreateDb():
            self._curState.assertIfExistThenSuccess(tasks, TaskCreateDb)
            # self.assertAtMostOneSuccess(tasks, CreateDbTask) # not really, in case of multiple creation and drops

        if self._curState.canDropDb():
            self._curState.assertIfExistThenSuccess(tasks, TaskDropDb)
            # self.assertAtMostOneSuccess(tasks, DropDbTask) # not really in case of drop-create-drop

        # if self._state.canCreateFixedTable():
            # self.assertIfExistThenSuccess(tasks, CreateFixedTableTask) # Not true, DB may be dropped
            # self.assertAtMostOneSuccess(tasks, CreateFixedTableTask) # not really, in case of create-drop-create

        # if self._state.canDropFixedTable():
            # self.assertIfExistThenSuccess(tasks, DropFixedTableTask) # Not True, the whole DB may be dropped
            # self.assertAtMostOneSuccess(tasks, DropFixedTableTask) # not really in case of drop-create-drop

        # if self._state.canAddData():
        #     self.assertIfExistThenSuccess(tasks, AddFixedDataTask)  # not true actually

        # if self._state.canReadData():
            # Nothing for sure

        newState = self._findCurrentState()
        logger.debug("[STT] New DB state determined: {}".format(newState))
        self._curState.verifyTasksToState(tasks, newState) # can old state move to new state through the tasks?
        self._curState = newState

    def pickTaskType(self):
        taskTypes = self.getTaskTypes() # all the task types we can choose from at curent state
        weights = []
        for tt in taskTypes:
            endState = tt.getEndState()
            if endState != None :
                weights.append(self._stateWeights[endState.getValIndex()]) # TODO: change to a method
            else:
                weights.append(10) # read data task, default to 10: TODO: change to a constant
        i = self._weighted_choice_sub(weights)
        # logger.debug(" (weighted random:{}/{}) ".format(i, len(taskTypes)))        
        return taskTypes[i]

    def _weighted_choice_sub(self, weights): # ref: https://eli.thegreenplace.net/2010/01/22/weighted-random-generation-in-python/
        rnd = random.random() * sum(weights) # TODO: use our dice to ensure it being determinstic?
        for i, w in enumerate(weights):
            rnd -= w
            if rnd < 0:
                return i
992

993
# Manager of the Database Data/Connection
994
class DbManager():    
995
    def __init__(self, resetDb = True):
S
Steven Li 已提交
996
        self.tableNumQueue = LinearQueue()
997
        self._lastTick = self.setupLastTick() # datetime.datetime(2019, 1, 1) # initial date time tick
998 999
        self._lastInt  = 0 # next one is initial integer 
        self._lock = threading.RLock()
1000
        
1001
        # self.openDbServerConnection()
1002
        self._dbConn = DbConn.createNative() if (gConfig.connector_type=='native') else DbConn.createRest()
1003 1004 1005 1006
        try:
            self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected
        except taos.error.ProgrammingError as err:
            # print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
1007
            if ( err.msg == 'client disconnected' ): # cannot open DB connection
1008 1009 1010 1011 1012
                print("Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
                sys.exit()
            else:
                raise            
        except:
S
Steven Li 已提交
1013
            print("[=] Unexpected exception")
1014
            raise        
1015 1016 1017

        if resetDb :
            self._dbConn.resetDb() # drop and recreate DB            
1018

1019 1020
        self._stateMachine = StateMechine(self._dbConn) # Do this after dbConn is in proper shape
        
1021 1022 1023
    def getDbConn(self):
        return self._dbConn

1024
    def getStateMachine(self) -> StateMechine :
1025 1026 1027 1028
        return self._stateMachine

    # def getState(self):
    #     return self._stateMachine.getCurrentState()
1029 1030 1031 1032 1033 1034

    # We aim to create a starting time tick, such that, whenever we run our test here once
    # We should be able to safely create 100,000 records, which will not have any repeated time stamp
    # when we re-run the test in 3 minutes (180 seconds), basically we should expand time duration
    # by a factor of 500.
    # TODO: what if it goes beyond 10 years into the future
1035
    # TODO: fix the error as result of above: "tsdb timestamp is out of range"
1036
    def setupLastTick(self):
1037
        t1 = datetime.datetime(2020, 6, 1)
1038
        t2 = datetime.datetime.now()
1039 1040
        elSec = int(t2.timestamp() - t1.timestamp()) # maybe a very large number, takes 69 years to exceed Python int range
        elSec2 = (  elSec % (8 * 12 * 30 * 24 * 60 * 60 / 500 ) ) * 500 # a number representing seconds within 10 years
1041 1042
        # print("elSec = {}".format(elSec))
        t3 = datetime.datetime(2012, 1, 1) # default "keep" is 10 years
1043
        t4 = datetime.datetime.fromtimestamp( t3.timestamp() + elSec2) # see explanation above
1044 1045 1046
        logger.info("Setting up TICKS to start from: {}".format(t4))
        return t4

S
Steven Li 已提交
1047 1048 1049
    def pickAndAllocateTable(self): # pick any table, and "use" it
        return self.tableNumQueue.pickAndAllocate()

1050 1051 1052 1053 1054
    def addTable(self):
        with self._lock:
            tIndex = self.tableNumQueue.push()
        return tIndex

1055 1056
    @classmethod
    def getFixedSuperTableName(cls):
1057
        return "fs_table"
1058

S
Steven Li 已提交
1059 1060 1061
    def releaseTable(self, i): # return the table back, so others can use it
        self.tableNumQueue.release(i)

1062
    def getNextTick(self):
1063 1064 1065
        with self._lock: # prevent duplicate tick
            self._lastTick += datetime.timedelta(0, 1) # add one second to it
            return self._lastTick
1066 1067

    def getNextInt(self):
1068 1069 1070
        with self._lock:
            self._lastInt += 1
            return self._lastInt
1071 1072

    def getNextBinary(self):
1073
        return "Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_Beijing_Shanghai_Los_Angeles_New_York_San_Francisco_Chicago_{}".format(self.getNextInt())
1074 1075 1076

    def getNextFloat(self):
        return 0.9 + self.getNextInt()
1077
    
S
Steven Li 已提交
1078
    def getTableNameToDelete(self):
1079
        tblNum = self.tableNumQueue.pop() # TODO: race condition!
1080 1081 1082
        if ( not tblNum ): # maybe false
            return False
        
S
Steven Li 已提交
1083 1084
        return "table_{}".format(tblNum)

1085 1086 1087
    def cleanUp(self):
        self._dbConn.close()      

1088
class TaskExecutor():
1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125
    class BoundedList:
        def __init__(self, size = 10):
            self._size = size
            self._list = []

        def add(self, n: int) :
            if not self._list: # empty
                self._list.append(n)
                return
            # now we should insert
            nItems = len(self._list)
            insPos = 0
            for i in range(nItems):
                insPos = i
                if n <= self._list[i] : # smaller than this item, time to insert
                    break # found the insertion point
                insPos += 1 # insert to the right

            if insPos == 0 : # except for the 1st item, # TODO: elimiate first item as gating item
                return # do nothing

            # print("Inserting at postion {}, value: {}".format(insPos, n))
            self._list.insert(insPos, n) # insert
            
            newLen = len(self._list)
            if newLen <= self._size :
                return # do nothing
            elif newLen == (self._size + 1) :                
                del self._list[0] # remove the first item
            else :
                raise RuntimeError("Corrupt Bounded List")

        def __str__(self):
            return repr(self._list)

    _boundedList = BoundedList()

1126 1127 1128
    def __init__(self, curStep):
        self._curStep = curStep

1129 1130 1131 1132
    @classmethod
    def getBoundedList(cls):
        return cls._boundedList

1133 1134 1135
    def getCurStep(self):
        return self._curStep

1136 1137
    def execute(self, task: Task, wt: WorkerThread): # execute a task on a thread
        task.execute(wt)
1138

1139 1140 1141 1142
    def recordDataMark(self, n: int):
        # print("[{}]".format(n), end="", flush=True)
        self._boundedList.add(n)

1143 1144
    # def logInfo(self, msg):
    #     logger.info("    T[{}.x]: ".format(self._curStep) + msg)
1145

1146 1147
    # def logDebug(self, msg):
    #     logger.debug("    T[{}.x]: ".format(self._curStep) + msg)
1148

S
Steven Li 已提交
1149
class Task():
1150 1151 1152 1153
    taskSn = 100

    @classmethod
    def allocTaskNum(cls):
S
Steven Li 已提交
1154 1155 1156
        Task.taskSn += 1 # IMPORTANT: cannot use cls.taskSn, since each sub class will have a copy
        # logger.debug("Allocating taskSN: {}".format(Task.taskSn))
        return Task.taskSn
1157

1158
    def __init__(self, dbManager: DbManager, execStats: ExecutionStats):        
1159
        self._dbManager = dbManager
1160
        self._workerThread = None 
1161
        self._err = None
1162
        self._aborted = False
1163
        self._curStep = None
1164
        self._numRows = None # Number of rows affected
1165 1166 1167

        # Assign an incremental task serial number        
        self._taskNum = self.allocTaskNum()
S
Steven Li 已提交
1168
        # logger.debug("Creating new task {}...".format(self._taskNum))
1169

1170
        self._execStats = execStats
1171
        self._lastSql = "" # last SQL executed/attempted
1172

1173 1174
    def isSuccess(self):
        return self._err == None
1175

1176 1177 1178
    def isAborted(self):
        return self._aborted

1179
    def clone(self): # TODO: why do we need this again?
1180
        newTask = self.__class__(self._dbManager, self._execStats)
1181 1182 1183
        return newTask

    def logDebug(self, msg):
S
Steven Li 已提交
1184
        self._workerThread.logDebug("Step[{}.{}] {}".format(self._curStep, self._taskNum, msg))
1185 1186

    def logInfo(self, msg):
S
Steven Li 已提交
1187
        self._workerThread.logInfo("Step[{}.{}] {}".format(self._curStep, self._taskNum, msg))
1188

1189
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1190
        raise RuntimeError("To be implemeted by child classes, class name: {}".format(self.__class__.__name__))
1191

1192 1193
    def execute(self, wt: WorkerThread):
        wt.verifyThreadSelf()
1194
        self._workerThread = wt # type: ignore
1195 1196

        te = wt.getTaskExecutor()
1197 1198
        self._curStep = te.getCurStep()
        self.logDebug("[-] executing task {}...".format(self.__class__.__name__))
1199 1200

        self._err = None
1201
        self._execStats.beginTaskType(self.__class__.__name__) # mark beginning
1202 1203 1204
        try:
            self._executeInternal(te, wt) # TODO: no return value?
        except taos.error.ProgrammingError as err:
1205 1206 1207 1208
            errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme
            if ( errno2 in [0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D, 0x381, 0x380, 0x383, 0x503, 0x600,
                1000 # REST catch-all error
                ]) : # allowed errors
1209
                self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql))
1210
                print("_", end="", flush=True)
1211 1212
                self._err = err  
            else:
1213 1214 1215 1216 1217
                errMsg = "[=] Unexpected Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql)
                self.logDebug(errMsg)
                if gConfig.debug :
                    raise # so that we see full stack
                else: # non-debug
1218 1219
                    print("\n\n----------------------------\nProgram ABORTED Due to Unexpected TAOS Error: \n\n{}\n".format(errMsg) +
                        "----------------------------\n")
1220 1221 1222
                    # sys.exit(-1)
                    self._err = err
                    self._aborted = True
1223
        except:
1224
            self.logDebug("[=] Unexpected exception, SQL: {}".format(self._lastSql))
1225
            raise
1226
        self._execStats.endTaskType(self.__class__.__name__, self.isSuccess())
1227
        
1228 1229
        self.logDebug("[X] task execution completed, {}, status: {}".format(self.__class__.__name__, "Success" if self.isSuccess() else "Failure"))        
        self._execStats.incExecCount(self.__class__.__name__, self.isSuccess()) # TODO: merge with above.
S
Steven Li 已提交
1230

1231
    def execSql(self, sql):
1232
        self._lastSql = sql
1233
        return self._dbManager.execute(sql)
1234

1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246
    def execWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread
        self._lastSql = sql
        return wt.execSql(sql)

    def queryWtSql(self, wt: WorkerThread, sql): # execute an SQL on the worker thread
        self._lastSql = sql
        return wt.querySql(sql)

    def getQueryResult(self, wt: WorkerThread): # execute an SQL on the worker thread
        return wt.getQueryResult()


1247
                  
1248
class ExecutionStats:
1249 1250 1251 1252 1253
    def __init__(self):
        self._execTimes: Dict[str, [int, int]] = {} # total/success times for a task
        self._tasksInProgress = 0
        self._lock = threading.Lock()
        self._firstTaskStartTime = None
1254 1255
        self._execStartTime = None
        self._elapsedTime = 0.0 # total elapsed time
1256 1257
        self._accRunTime = 0.0 # accumulated run time

1258 1259 1260 1261 1262 1263 1264 1265 1266
        self._failed = False
        self._failureReason = None

    def startExec(self):
        self._execStartTime = time.time()

    def endExec(self):
        self._elapsedTime = time.time() - self._execStartTime

1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287
    def incExecCount(self, klassName, isSuccess): # TODO: add a lock here
        if klassName not in self._execTimes:
            self._execTimes[klassName] = [0, 0]
        t = self._execTimes[klassName] # tuple for the data
        t[0] += 1 # index 0 has the "total" execution times
        if isSuccess:
            t[1] += 1 # index 1 has the "success" execution times

    def beginTaskType(self, klassName):
        with self._lock:
            if self._tasksInProgress == 0 : # starting a new round
                self._firstTaskStartTime = time.time() # I am now the first task
            self._tasksInProgress += 1

    def endTaskType(self, klassName, isSuccess):
        with self._lock:
            self._tasksInProgress -= 1
            if self._tasksInProgress == 0 : # all tasks have stopped
                self._accRunTime += (time.time() - self._firstTaskStartTime)
                self._firstTaskStartTime = None

1288 1289 1290 1291
    def registerFailure(self, reason):
        self._failed = True
        self._failureReason = reason

1292
    def printStats(self):
1293 1294 1295 1296
        logger.info("----------------------------------------------------------------------")
        logger.info("| Crash_Gen test {}, with the following stats:".
            format("FAILED (reason: {})".format(self._failureReason) if self._failed else "SUCCEEDED"))
        logger.info("| Task Execution Times (success/total):")
1297 1298
        execTimesAny = 0
        for k, n in self._execTimes.items():            
1299
            execTimesAny += n[0]
1300
            logger.info("|    {0:<24}: {1}/{2}".format(k,n[1],n[0]))
1301
                
1302 1303 1304 1305 1306
        logger.info("| Total Tasks Executed (success or not): {} ".format(execTimesAny))
        logger.info("| Total Tasks In Progress at End: {}".format(self._tasksInProgress))
        logger.info("| Total Task Busy Time (elapsed time when any task is in progress): {:.3f} seconds".format(self._accRunTime))
        logger.info("| Average Per-Task Execution Time: {:.3f} seconds".format(self._accRunTime/execTimesAny))
        logger.info("| Total Elapsed Time (from wall clock): {:.3f} seconds".format(self._elapsedTime))
1307
        logger.info("| Top numbers written: {}".format(TaskExecutor.getBoundedList()))
1308 1309
        logger.info("----------------------------------------------------------------------")
        
1310 1311 1312


class StateTransitionTask(Task):
1313 1314 1315 1316 1317
    LARGE_NUMBER_OF_TABLES = 35
    SMALL_NUMBER_OF_TABLES = 3
    LARGE_NUMBER_OF_RECORDS = 50
    SMALL_NUMBER_OF_RECORDS = 3

1318 1319 1320 1321
    @classmethod
    def getInfo(cls): # each sub class should supply their own information
        raise RuntimeError("Overriding method expected")

1322 1323 1324 1325 1326
    _endState = None 
    @classmethod
    def getEndState(cls): # TODO: optimize by calling it fewer times
        raise RuntimeError("Overriding method expected")

1327 1328 1329
    # @classmethod
    # def getBeginStates(cls):
    #     return cls.getInfo()[0]
1330

1331 1332 1333
    # @classmethod
    # def getEndState(cls): # returning the class name
    #     return cls.getInfo()[0]
1334 1335

    @classmethod
1336 1337 1338
    def canBeginFrom(cls, state: AnyState):
        # return state.getValue() in cls.getBeginStates()
        raise RuntimeError("must be overriden")
1339

1340 1341 1342 1343
    @classmethod
    def getRegTableName(cls, i):
        return "db.reg_table_{}".format(i)

1344 1345 1346
    def execute(self, wt: WorkerThread):
        super().execute(wt)
        
1347
class TaskCreateDb(StateTransitionTask):
1348
    @classmethod
1349 1350
    def getEndState(cls):
        return StateDbOnly() 
1351

1352 1353 1354 1355
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canCreateDb()

1356
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1357
        self.execWtSql(wt, "create database db")       
1358

1359
class TaskDropDb(StateTransitionTask):
1360
    @classmethod
1361 1362
    def getEndState(cls):
        return StateEmpty()
1363

1364 1365 1366 1367
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canDropDb()

1368
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1369
        self.execWtSql(wt, "drop database db")
S
Steven Li 已提交
1370
        logger.debug("[OPS] database dropped at {}".format(time.time()))
1371

1372
class TaskCreateSuperTable(StateTransitionTask):
1373
    @classmethod
1374 1375
    def getEndState(cls):
        return StateSuperTableOnly()
1376

1377 1378
    @classmethod
    def canBeginFrom(cls, state: AnyState):
1379
        return state.canCreateFixedSuperTable()
1380

1381
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1382 1383 1384 1385 1386 1387 1388
        if not wt.dbInUse(): # no DB yet, to the best of our knowledge
            logger.debug("Skipping task, no DB yet")
            return

        tblName = self._dbManager.getFixedSuperTableName()   
        # wt.execSql("use db")    # should always be in place
        self.execWtSql(wt, "create table db.{} (ts timestamp, speed int) tags (b binary(200), f float) ".format(tblName))
1389 1390
        # No need to create the regular tables, INSERT will do that automatically

S
Steven Li 已提交
1391

1392
class TaskReadData(StateTransitionTask):
1393
    @classmethod
1394 1395
    def getEndState(cls):
        return None # meaning doesn't affect state
1396

1397 1398 1399 1400
    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canReadData()

1401
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1402
        sTbName = self._dbManager.getFixedSuperTableName()        
1403 1404
        self.queryWtSql(wt, "select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later

1405
        if random.randrange(5) == 0 : # 1 in 5 chance, simulate a broken connection. TODO: break connection in all situations
1406 1407
            wt.getDbConn().close()
            wt.getDbConn().open()
1408
        else:
1409
            rTables = self.getQueryResult(wt) # wt.getDbConn().getQueryResult()
1410 1411
            # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
            for rTbName in rTables : # regular tables
1412
                self.execWtSql(wt, "select * from db.{}".format(rTbName[0]))
1413

1414 1415
        # tdSql.query(" cars where tbname in ('carzero', 'carone')")

1416
class TaskDropSuperTable(StateTransitionTask):
1417
    @classmethod
1418 1419
    def getEndState(cls):
        return StateDbOnly() 
1420

1421 1422
    @classmethod
    def canBeginFrom(cls, state: AnyState):
1423
        return state.canDropFixedSuperTable()
1424

1425
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1426 1427 1428 1429 1430
        # 1/2 chance, we'll drop the regular tables one by one, in a randomized sequence
        if Dice.throw(2) == 0 : 
            tblSeq = list(range(2 + (self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES))) 
            random.shuffle(tblSeq) 
            tickOutput = False # if we have spitted out a "d" character for "drop regular table"
1431
            isSuccess = True
1432
            for i in tblSeq: 
1433
                regTableName = self.getRegTableName(i); # "db.reg_table_{}".format(i)                  
1434
                try:
1435
                    self.execWtSql(wt, "drop table {}".format(regTableName)) # nRows always 0, like MySQL
1436 1437
                except taos.error.ProgrammingError as err:                    
                    errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correcting for strange error number scheme
1438 1439
                    if ( errno2 in [0x362]) : # mnode invalid table name
                        isSuccess = False
1440
                        logger.debug("[DB] Acceptable error when dropping a table")
1441
                    continue # try to delete next regular table
1442 1443 1444

                if (not tickOutput):
                    tickOutput = True # Print only one time
1445
                    if isSuccess :
1446 1447
                        print("d", end="", flush=True)
                    else:
1448
                        print("f", end="", flush=True)                    
1449 1450 1451 1452

        # Drop the super table itself
        tblName = self._dbManager.getFixedSuperTableName()     
        self.execWtSql(wt, "drop table db.{}".format(tblName))
1453

1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466
class TaskAlterTags(StateTransitionTask):
    @classmethod
    def getEndState(cls):
        return None # meaning doesn't affect state

    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canDropFixedSuperTable() # if we can drop it, we can alter tags

    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
        tblName = self._dbManager.getFixedSuperTableName()   
        dice = Dice.throw(4)
        if dice == 0 :
1467
            sql = "alter table db.{} add tag extraTag int".format(tblName)
1468
        elif dice == 1 :
1469
            sql = "alter table db.{} drop tag extraTag".format(tblName)
1470
        elif dice == 2 :
1471
            sql = "alter table db.{} drop tag newTag".format(tblName)
1472
        else: # dice == 3
1473 1474 1475
            sql = "alter table db.{} change tag extraTag newTag".format(tblName)

        self.execWtSql(wt, sql)
1476

1477
class TaskAddData(StateTransitionTask):
1478
    activeTable : Set[int] = set() # Track which table is being actively worked on
1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492

    # We use these two files to record operations to DB, useful for power-off tests
    fAddLogReady = None
    fAddLogDone = None

    @classmethod
    def prepToRecordOps(cls):
        if gConfig.record_ops :            
            if ( cls.fAddLogReady == None ):
                logger.info("Recording in a file operations to be performed...")
                cls.fAddLogReady = open("add_log_ready.txt", "w")
            if ( cls.fAddLogDone == None ):
                logger.info("Recording in a file operations completed...")
                cls.fAddLogDone = open("add_log_done.txt", "w")
1493

1494
    @classmethod
1495 1496
    def getEndState(cls):
        return StateHasData()
1497 1498 1499 1500

    @classmethod
    def canBeginFrom(cls, state: AnyState):
        return state.canAddData()
1501 1502
        
    def _executeInternal(self, te: TaskExecutor, wt: WorkerThread):
1503
        ds = self._dbManager
1504
        # wt.execSql("use db") # TODO: seems to be an INSERT bug to require this
1505
        tblSeq = list(range(self.LARGE_NUMBER_OF_TABLES if gConfig.larger_data else self.SMALL_NUMBER_OF_TABLES)) 
1506 1507 1508 1509 1510 1511 1512 1513
        random.shuffle(tblSeq) 
        for i in tblSeq: 
            if ( i in self.activeTable ): # wow already active
                # logger.info("Concurrent data insertion into table: {}".format(i))      
                # print("ct({})".format(i), end="", flush=True) # Concurrent insertion into table
                print("x", end="", flush=True)
            else:
                self.activeTable.add(i) # marking it active
1514 1515
            # No need to shuffle data sequence, unless later we decide to do non-increment insertion      
            regTableName = self.getRegTableName(i); # "db.reg_table_{}".format(i)      
1516
            for j in range(self.LARGE_NUMBER_OF_RECORDS if gConfig.larger_data else self.SMALL_NUMBER_OF_RECORDS) : # number of records per table
1517
                nextInt = ds.getNextInt()                
1518 1519 1520 1521 1522 1523 1524
                if gConfig.record_ops:
                    self.prepToRecordOps()
                    self.fAddLogReady.write("Ready to write {} to {}\n".format(nextInt, regTableName))
                    self.fAddLogReady.flush()
                    os.fsync(self.fAddLogReady)
                sql = "insert into {} using {} tags ('{}', {}) values ('{}', {});".format(
                    regTableName, 
1525 1526
                    ds.getFixedSuperTableName(), 
                    ds.getNextBinary(), ds.getNextFloat(),
1527
                    ds.getNextTick(), nextInt)
1528
                self.execWtSql(wt, sql) 
1529 1530
                # Successfully wrote the data into the DB, let's record it somehow
                te.recordDataMark(nextInt)
1531 1532 1533 1534
                if gConfig.record_ops:
                    self.fAddLogDone.write("Wrote {} to {}\n".format(nextInt, regTableName))
                    self.fAddLogDone.flush()
                    os.fsync(self.fAddLogDone)
1535
            self.activeTable.discard(i) # not raising an error, unlike remove
1536 1537


S
Steven Li 已提交
1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559
# Deterministic random number generator
class Dice():
    seeded = False # static, uninitialized

    @classmethod
    def seed(cls, s): # static
        if (cls.seeded):
            raise RuntimeError("Cannot seed the random generator more than once")
        cls.verifyRNG()
        random.seed(s)
        cls.seeded = True  # TODO: protect against multi-threading

    @classmethod
    def verifyRNG(cls): # Verify that the RNG is determinstic
        random.seed(0)
        x1 = random.randrange(0, 1000)
        x2 = random.randrange(0, 1000)
        x3 = random.randrange(0, 1000)
        if ( x1 != 864 or x2!=394 or x3!=776 ):
            raise RuntimeError("System RNG is not deterministic")

    @classmethod
1560 1561
    def throw(cls, stop): # get 0 to stop-1
        return cls.throwRange(0, stop)
S
Steven Li 已提交
1562 1563

    @classmethod
1564
    def throwRange(cls, start, stop): # up to stop-1
S
Steven Li 已提交
1565 1566
        if ( not cls.seeded ):
            raise RuntimeError("Cannot throw dice before seeding it")
1567
        return random.randrange(start, stop)
S
Steven Li 已提交
1568 1569 1570


# Anyone needing to carry out work should simply come here
1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592
# class WorkDispatcher():
#     def __init__(self, dbState):
#         # self.totalNumMethods = 2
#         self.tasks = [
#             # CreateTableTask(dbState), # Obsolete
#             # DropTableTask(dbState),
#             # AddDataTask(dbState),
#         ]

#     def throwDice(self):
#         max = len(self.tasks) - 1 
#         dRes = random.randint(0, max)
#         # logger.debug("Threw the dice in range [{},{}], and got: {}".format(0,max,dRes))
#         return dRes

#     def pickTask(self):
#         dice = self.throwDice()
#         return self.tasks[dice]

#     def doWork(self, workerThread):
#         task = self.pickTask()
#         task.execute(workerThread)
S
Steven Li 已提交
1593

S
Steven Li 已提交
1594 1595
class LoggingFilter(logging.Filter):
    def filter(self, record: logging.LogRecord):
S
Steven Li 已提交
1596 1597 1598
        if ( record.levelno >= logging.INFO ) :
            return True # info or above always log

S
Steven Li 已提交
1599 1600 1601 1602
        # Commenting out below to adjust...

        # if msg.startswith("[TRD]"):
        #     return False
S
Steven Li 已提交
1603 1604
        return True

1605 1606 1607 1608 1609
class MyLoggingAdapter(logging.LoggerAdapter):    
    def process(self, msg, kwargs):
        return "[{}]{}".format(threading.get_ident() % 10000, msg), kwargs
        # return '[%s] %s' % (self.extra['connid'], msg), kwargs

1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728
class SvcManager:    
    
    def __init__(self):
        print("Starting service manager")
        signal.signal(signal.SIGTERM, self.sigIntHandler)
        signal.signal(signal.SIGINT, self.sigIntHandler)
        self.ioThread = None
        self.subProcess = None
        self.shouldStop = False
        self.status = MainExec.STATUS_RUNNING

    def svcOutputReader(self, out: IO, queue):
        # print("This is the svcOutput Reader...")
        for line in out : # iter(out.readline, b''):
            # print("Finished reading a line: {}".format(line))
            queue.put(line.rstrip()) # get rid of new lines
        print("No more output from incoming IO") # meaning sub process must have died
        out.close()

    def sigIntHandler(self, signalNumber, frame):
        if self.status != MainExec.STATUS_RUNNING :
            print("Ignoring repeated SIGINT...")
            return # do nothing if it's already not running
        self.status = MainExec.STATUS_STOPPING # immediately set our status

        print("Terminating program...")
        self.subProcess.send_signal(signal.SIGINT)
        self.shouldStop = True
        self.joinIoThread()

    def joinIoThread(self):
        if self.ioThread :
            self.ioThread.join()
            self.ioThread = None        

    def run(self):
        ON_POSIX = 'posix' in sys.builtin_module_names
        svcCmd = ['../../build/build/bin/taosd', '-c', '../../build/test/cfg']
        # svcCmd = ['vmstat', '1']
        self.subProcess = subprocess.Popen(svcCmd, stdout=subprocess.PIPE, bufsize=1, close_fds=ON_POSIX, text=True)
        q = Queue()
        self.ioThread = threading.Thread(target=self.svcOutputReader, args=(self.subProcess.stdout, q))
        self.ioThread.daemon = True # thread dies with the program
        self.ioThread.start()

        # proc = subprocess.Popen(['echo', '"to stdout"'], 
        #                 stdout=subprocess.PIPE,
        #                 )
        # stdout_value = proc.communicate()[0]
        # print('\tstdout: {}'.format(repr(stdout_value)))

        while True :
            try:  
                line = q.get_nowait() # getting output at fast speed                    
            except Empty:
                # print('no output yet')
                time.sleep(2.3) # wait only if there's no output
            else: # got line
                print(line)
            # print("----end of iteration----")
            if self.shouldStop:
                print("Ending main Svc thread")
                break

        print("end of loop")
        
        self.joinIoThread()
        print("Finished")

class ClientManager:
    def __init__(self):
        print("Starting service manager")
        signal.signal(signal.SIGTERM, self.sigIntHandler)
        signal.signal(signal.SIGINT, self.sigIntHandler)

        self.status = MainExec.STATUS_RUNNING
        self.tc = None

    def sigIntHandler(self, signalNumber, frame):
        if self.status != MainExec.STATUS_RUNNING :
            print("Ignoring repeated SIGINT...")
            return # do nothing if it's already not running
        self.status = MainExec.STATUS_STOPPING # immediately set our status

        print("Terminating program...")
        self.tc.requestToStop()

    def _printLastNumbers(self): # to verify data durability
        dbManager = DbManager(resetDb=False)
        dbc = dbManager.getDbConn()
        if dbc.query("show databases") == 0 : # no databae
            return

        dbc.execute("use db")
        sTbName = dbManager.getFixedSuperTableName()        

        # get all regular tables
        dbc.query("select TBNAME from db.{}".format(sTbName)) # TODO: analyze result set later
        rTables = dbc.getQueryResult()

        bList = TaskExecutor.BoundedList()
        for rTbName in rTables : # regular tables
            dbc.query("select speed from db.{}".format(rTbName[0]))
            numbers = dbc.getQueryResult()
            for row in numbers :                
                # print("<{}>".format(n), end="", flush=True)
                bList.add(row[0])

        print("Top numbers in DB right now: {}".format(bList))
        print("TDengine client execution is about to start in 2 seconds...")
        time.sleep(2.0)
        dbManager = None # release?

    def prepare(self):
        self._printLastNumbers()

    def run(self):
        self._printLastNumbers()

1729 1730 1731
        dbManager = DbManager() # Regular function
        Dice.seed(0) # initial seeding of dice
        thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
1732
        self.tc = ThreadCoordinator(thPool, dbManager)
S
Steven Li 已提交
1733
        
1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750
        self.tc.run()
        self.conclude()

    def conclude(self):
        self.tc.printStats()
        self.tc.getDbManager().cleanUp()    


class MainExec:
    STATUS_RUNNING = 1
    STATUS_STOPPING = 2
    # STATUS_STOPPED = 3 # Not used yet

    @classmethod
    def runClient(cls):
        clientManager = ClientManager()
        clientManager.run()
1751 1752 1753

    @classmethod
    def runService(cls):
1754 1755
        svcManager = SvcManager()
        svcManager.run()
1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799

    @classmethod
    def runTemp(cls): # for debugging purposes
        # # Hack to exercise reading from disk, imcreasing coverage. TODO: fix
        # dbc = dbState.getDbConn()
        # sTbName = dbState.getFixedSuperTableName()   
        # dbc.execute("create database if not exists db")
        # if not dbState.getState().equals(StateEmpty()):
        #     dbc.execute("use db")     

        # rTables = None
        # try: # the super table may not exist
        #     sql = "select TBNAME from db.{}".format(sTbName)
        #     logger.info("Finding out tables in super table: {}".format(sql))
        #     dbc.query(sql) # TODO: analyze result set later
        #     logger.info("Fetching result")
        #     rTables = dbc.getQueryResult()
        #     logger.info("Result: {}".format(rTables))
        # except taos.error.ProgrammingError as err:
        #     logger.info("Initial Super table OPS error: {}".format(err))
        
        # # sys.exit()
        # if ( not rTables == None):
        #     # print("rTables[0] = {}, type = {}".format(rTables[0], type(rTables[0])))
        #     try:
        #         for rTbName in rTables : # regular tables
        #             ds = dbState
        #             logger.info("Inserting into table: {}".format(rTbName[0]))
        #             sql = "insert into db.{} values ('{}', {});".format(
        #                 rTbName[0],                    
        #                 ds.getNextTick(), ds.getNextInt())
        #             dbc.execute(sql)
        #         for rTbName in rTables : # regular tables        
        #             dbc.query("select * from db.{}".format(rTbName[0])) # TODO: check success failure
        #         logger.info("Initial READING operation is successful")       
        #     except taos.error.ProgrammingError as err:
        #         logger.info("Initial WRITE/READ error: {}".format(err))   
        
        # Sandbox testing code
        # dbc = dbState.getDbConn()
        # while True:
        #     rows = dbc.query("show databases") 
        #     print("Rows: {}, time={}".format(rows, time.time()))
        return 
S
Steven Li 已提交
1800

1801
def main():
1802
    # Super cool Python argument library: https://docs.python.org/3/library/argparse.html
1803 1804 1805 1806 1807 1808 1809 1810 1811
    parser = argparse.ArgumentParser(
        formatter_class=argparse.RawDescriptionHelpFormatter,
        description=textwrap.dedent('''\
            TDengine Auto Crash Generator (PLEASE NOTICE the Prerequisites Below)
            ---------------------------------------------------------------------
            1. You build TDengine in the top level ./build directory, as described in offical docs
            2. You run the server there before this script: ./build/bin/taosd -c test/cfg

            '''))
1812

1813 1814
    parser.add_argument('-c', '--connector-type', action='store', default='native', type=str,
                        help='Connector type to use: native, rest, or mixed (default: 10)')
1815 1816
    parser.add_argument('-d', '--debug', action='store_true',                        
                        help='Turn on DEBUG mode for more logging (default: false)')
1817 1818
    parser.add_argument('-e', '--run-tdengine', action='store_true',                        
                        help='Run TDengine service in foreground (default: false)')
1819 1820
    parser.add_argument('-l', '--larger-data', action='store_true',                        
                        help='Write larger amount of data during write operations (default: false)')
1821
    parser.add_argument('-p', '--per-thread-db-connection', action='store_true',                        
1822
                        help='Use a single shared db connection (default: false)')
1823 1824
    parser.add_argument('-r', '--record-ops', action='store_true',                        
                        help='Use a pair of always-fsynced fils to record operations performing + performed, for power-off tests (default: false)')                    
1825
    parser.add_argument('-s', '--max-steps', action='store', default=1000, type=int,
1826
                        help='Maximum number of steps to run (default: 100)')
1827
    parser.add_argument('-t', '--num-threads', action='store', default=5, type=int,
1828
                        help='Number of threads to run (default: 10)')
1829

1830
    global gConfig
1831
    gConfig = parser.parse_args()
1832

1833 1834 1835
    # if len(sys.argv) == 1:
    #     parser.print_help()
    #     sys.exit()
1836
 
1837
    # Logging Stuff
1838
    global logger
1839 1840 1841 1842 1843 1844 1845
    _logger = logging.getLogger('CrashGen') # real logger
    _logger.addFilter(LoggingFilter()) 
    ch = logging.StreamHandler()
    _logger.addHandler(ch)

    logger = MyLoggingAdapter(_logger, []) # Logging adapter, to be used as a logger

1846 1847
    if ( gConfig.debug ):
        logger.setLevel(logging.DEBUG) # default seems to be INFO        
S
Steven Li 已提交
1848 1849
    else:
        logger.setLevel(logging.INFO)
1850 1851
    
    # Run server or client
1852 1853 1854 1855
    if gConfig.run_tdengine : # run server
        MainExec.runService()
    else :
        MainExec.runClient()
1856

1857
    
S
Steven Li 已提交
1858
    # logger.info("Crash_Gen execution finished")
1859 1860 1861

if __name__ == "__main__":
    main()