提交 f7a0b6b8 编写于 作者: S Steven Li

Enhanced crash_gen tool to verify dnode being in ready status, plus additional refactoring

上级 b74add25
......@@ -14,22 +14,17 @@
# For type hinting before definition, ref:
# https://stackoverflow.com/questions/33533148/how-do-i-specify-that-the-return-type-of-a-method-is-the-same-as-the-class-itsel
from __future__ import annotations
import taos
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.log import *
from typing import Set
from typing import Dict
from typing import List
from requests.auth import HTTPBasicAuth
import textwrap
import time
import datetime
import random
import logging
import threading
import requests
import copy
import argparse
import getopt
......@@ -44,6 +39,10 @@ import gc
from .service_manager import ServiceManager, TdeInstance
from .misc import Logging, Status, CrashGenError, Dice, Helper, Progress
from .db import DbConn, MyTDSql, DbConnNative, DbManager
import taos
import requests
# Require Python 3
if sys.version_info[0] < 3:
......@@ -78,10 +77,11 @@ class WorkerThread:
# Let us have a DB connection of our own
if (gConfig.per_thread_db_connection): # type: ignore
# print("connector_type = {}".format(gConfig.connector_type))
if gConfig.connector_type == 'native':
self._dbConn = DbConn.createNative()
tInst = gContainer.defTdeInstance
if gConfig.connector_type == 'native':
self._dbConn = DbConn.createNative(tInst.getDbTarget())
elif gConfig.connector_type == 'rest':
self._dbConn = DbConn.createRest()
self._dbConn = DbConn.createRest(tInst.getDbTarget())
elif gConfig.connector_type == 'mixed':
if Dice.throw(2) == 0: # 1/2 chance
self._dbConn = DbConn.createNative()
......@@ -505,7 +505,7 @@ class ThreadCoordinator:
# pick a task type for current state
db = self.pickDatabase()
taskType = db.getStateMachine().pickTaskType() # type: Task
taskType = db.getStateMachine().pickTaskType() # dynamic name of class
return taskType(self._execStats, db) # create a task from it
def resetExecutedTasks(self):
......@@ -619,342 +619,6 @@ class LinearQueue():
return ret
class DbConn:
TYPE_NATIVE = "native-c"
TYPE_REST = "rest-api"
TYPE_INVALID = "invalid"
@classmethod
def create(cls, connType):
if connType == cls.TYPE_NATIVE:
return DbConnNative()
elif connType == cls.TYPE_REST:
return DbConnRest()
else:
raise RuntimeError(
"Unexpected connection type: {}".format(connType))
@classmethod
def createNative(cls):
return cls.create(cls.TYPE_NATIVE)
@classmethod
def createRest(cls):
return cls.create(cls.TYPE_REST)
def __init__(self):
self.isOpen = False
self._type = self.TYPE_INVALID
self._lastSql = None
def getLastSql(self):
return self._lastSql
def open(self):
if (self.isOpen):
raise RuntimeError("Cannot re-open an existing DB connection")
# below implemented by child classes
self.openByType()
Logging.debug("[DB] data connection opened, type = {}".format(self._type))
self.isOpen = True
def close(self):
raise RuntimeError("Unexpected execution, should be overriden")
def queryScalar(self, sql) -> int:
return self._queryAny(sql)
def queryString(self, sql) -> str:
return self._queryAny(sql)
def _queryAny(self, sql): # actual query result as an int
if (not self.isOpen):
raise RuntimeError("Cannot query database until connection is open")
nRows = self.query(sql)
if nRows != 1:
raise taos.error.ProgrammingError(
"Unexpected result for query: {}, rows = {}".format(sql, nRows),
(0x991 if nRows==0 else 0x992)
)
if self.getResultRows() != 1 or self.getResultCols() != 1:
raise RuntimeError("Unexpected result set for query: {}".format(sql))
return self.getQueryResult()[0][0]
def use(self, dbName):
self.execute("use {}".format(dbName))
def existsDatabase(self, dbName: str):
''' Check if a certain database exists '''
self.query("show databases")
dbs = [v[0] for v in self.getQueryResult()] # ref: https://stackoverflow.com/questions/643823/python-list-transformation
# ret2 = dbName in dbs
# print("dbs = {}, str = {}, ret2={}, type2={}".format(dbs, dbName,ret2, type(dbName)))
return dbName in dbs # TODO: super weird type mangling seen, once here
def hasTables(self):
return self.query("show tables") > 0
def execute(self, sql):
''' Return the number of rows affected'''
raise RuntimeError("Unexpected execution, should be overriden")
def safeExecute(self, sql):
'''Safely execute any SQL query, returning True/False upon success/failure'''
try:
self.execute(sql)
return True # ignore num of results, return success
except taos.error.ProgrammingError as err:
return False # failed, for whatever TAOS reason
# Not possile to reach here, non-TAOS exception would have been thrown
def query(self, sql) -> int: # return num rows returned
''' Return the number of rows affected'''
raise RuntimeError("Unexpected execution, should be overriden")
def openByType(self):
raise RuntimeError("Unexpected execution, should be overriden")
def getQueryResult(self):
raise RuntimeError("Unexpected execution, should be overriden")
def getResultRows(self):
raise RuntimeError("Unexpected execution, should be overriden")
def getResultCols(self):
raise RuntimeError("Unexpected execution, should be overriden")
# Sample: curl -u root:taosdata -d "show databases" localhost:6020/rest/sql
class DbConnRest(DbConn):
def __init__(self):
super().__init__()
self._type = self.TYPE_REST
self._url = "http://localhost:6041/rest/sql" # fixed for now
self._result = None
def openByType(self): # Open connection
pass # do nothing, always open
def close(self):
if (not self.isOpen):
raise RuntimeError("Cannot clean up database until connection is open")
# Do nothing for REST
Logging.debug("[DB] REST Database connection closed")
self.isOpen = False
def _doSql(self, sql):
self._lastSql = sql # remember this, last SQL attempted
try:
r = requests.post(self._url,
data = sql,
auth = HTTPBasicAuth('root', 'taosdata'))
except:
print("REST API Failure (TODO: more info here)")
raise
rj = r.json()
# Sanity check for the "Json Result"
if ('status' not in rj):
raise RuntimeError("No status in REST response")
if rj['status'] == 'error': # clearly reported error
if ('code' not in rj): # error without code
raise RuntimeError("REST error return without code")
errno = rj['code'] # May need to massage this in the future
# print("Raising programming error with REST return: {}".format(rj))
raise taos.error.ProgrammingError(
rj['desc'], errno) # todo: check existance of 'desc'
if rj['status'] != 'succ': # better be this
raise RuntimeError(
"Unexpected REST return status: {}".format(
rj['status']))
nRows = rj['rows'] if ('rows' in rj) else 0
self._result = rj
return nRows
def execute(self, sql):
if (not self.isOpen):
raise RuntimeError(
"Cannot execute database commands until connection is open")
Logging.debug("[SQL-REST] Executing SQL: {}".format(sql))
nRows = self._doSql(sql)
Logging.debug(
"[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
return nRows
def query(self, sql): # return rows affected
return self.execute(sql)
def getQueryResult(self):
return self._result['data']
def getResultRows(self):
print(self._result)
raise RuntimeError("TBD")
# return self._tdSql.queryRows
def getResultCols(self):
print(self._result)
raise RuntimeError("TBD")
# Duplicate code from TDMySQL, TODO: merge all this into DbConnNative
class MyTDSql:
# Class variables
_clsLock = threading.Lock() # class wide locking
longestQuery = None # type: str
longestQueryTime = 0.0 # seconds
lqStartTime = 0.0
# lqEndTime = 0.0 # Not needed, as we have the two above already
def __init__(self, hostAddr, cfgPath):
# Make the DB connection
self._conn = taos.connect(host=hostAddr, config=cfgPath)
self._cursor = self._conn.cursor()
self.queryRows = 0
self.queryCols = 0
self.affectedRows = 0
# def init(self, cursor, log=True):
# self.cursor = cursor
# if (log):
# caller = inspect.getframeinfo(inspect.stack()[1][0])
# self.cursor.log(caller.filename + ".sql")
def close(self):
self._cursor.close() # can we double close?
self._conn.close() # TODO: very important, cursor close does NOT close DB connection!
self._cursor.close()
def _execInternal(self, sql):
startTime = time.time()
ret = self._cursor.execute(sql)
# print("\nSQL success: {}".format(sql))
queryTime = time.time() - startTime
# Record the query time
cls = self.__class__
if queryTime > (cls.longestQueryTime + 0.01) :
with cls._clsLock:
cls.longestQuery = sql
cls.longestQueryTime = queryTime
cls.lqStartTime = startTime
return ret
def query(self, sql):
self.sql = sql
try:
self._execInternal(sql)
self.queryResult = self._cursor.fetchall()
self.queryRows = len(self.queryResult)
self.queryCols = len(self._cursor.description)
except Exception as e:
# caller = inspect.getframeinfo(inspect.stack()[1][0])
# args = (caller.filename, caller.lineno, sql, repr(e))
# tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
raise
return self.queryRows
def execute(self, sql):
self.sql = sql
try:
self.affectedRows = self._execInternal(sql)
except Exception as e:
# caller = inspect.getframeinfo(inspect.stack()[1][0])
# args = (caller.filename, caller.lineno, sql, repr(e))
# tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
raise
return self.affectedRows
class DbConnNative(DbConn):
# Class variables
_lock = threading.Lock()
_connInfoDisplayed = False
totalConnections = 0 # Not private
def __init__(self):
super().__init__()
self._type = self.TYPE_NATIVE
self._conn = None
# self._cursor = None
def openByType(self): # Open connection
global gContainer
tdeInstance = gContainer.defTdeInstance # set up in ClientManager, type: TdeInstance
# cfgPath = self.getBuildPath() + "/test/cfg"
cfgPath = tdeInstance.getCfgDir()
hostAddr = tdeInstance.getHostAddr()
cls = self.__class__ # Get the class, to access class variables
with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!!
if not cls._connInfoDisplayed:
cls._connInfoDisplayed = True # updating CLASS variable
Logging.info("Initiating TAOS native connection to {}, using config at {}".format(hostAddr, cfgPath))
# Make the connection
# self._conn = taos.connect(host=hostAddr, config=cfgPath) # TODO: make configurable
# self._cursor = self._conn.cursor()
# Record the count in the class
self._tdSql = MyTDSql(hostAddr, cfgPath) # making DB connection
cls.totalConnections += 1
self._tdSql.execute('reset query cache')
# self._cursor.execute('use db') # do this at the beginning of every
# Open connection
# self._tdSql = MyTDSql()
# self._tdSql.init(self._cursor)
def close(self):
if (not self.isOpen):
raise RuntimeError("Cannot clean up database until connection is open")
self._tdSql.close()
# Decrement the class wide counter
cls = self.__class__ # Get the class, to access class variables
with cls._lock:
cls.totalConnections -= 1
Logging.debug("[DB] Database connection closed")
self.isOpen = False
def execute(self, sql):
if (not self.isOpen):
raise RuntimeError("Cannot execute database commands until connection is open")
Logging.debug("[SQL] Executing SQL: {}".format(sql))
self._lastSql = sql
nRows = self._tdSql.execute(sql)
Logging.debug(
"[SQL] Execution Result, nRows = {}, SQL = {}".format(
nRows, sql))
return nRows
def query(self, sql): # return rows affected
if (not self.isOpen):
raise RuntimeError(
"Cannot query database until connection is open")
Logging.debug("[SQL] Executing SQL: {}".format(sql))
self._lastSql = sql
nRows = self._tdSql.query(sql)
Logging.debug(
"[SQL] Query Result, nRows = {}, SQL = {}".format(
nRows, sql))
return nRows
# results are in: return self._tdSql.queryResult
def getQueryResult(self):
return self._tdSql.queryResult
def getResultRows(self):
return self._tdSql.queryRows
def getResultCols(self):
return self._tdSql.queryCols
class AnyState:
STATE_INVALID = -1
STATE_EMPTY = 0 # nothing there, no even a DB
......@@ -1439,64 +1103,6 @@ class Database:
return ret
class DbManager():
''' This is a wrapper around DbConn(), to make it easier to use.
TODO: rename this to DbConnManager
'''
def __init__(self):
self.tableNumQueue = LinearQueue() # TODO: delete?
# self.openDbServerConnection()
self._dbConn = DbConn.createNative() if (
gConfig.connector_type == 'native') else DbConn.createRest()
try:
self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected
except taos.error.ProgrammingError as err:
# print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
if (err.msg == 'client disconnected'): # cannot open DB connection
print(
"Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
sys.exit(2)
else:
print("Failed to connect to DB, errno = {}, msg: {}"
.format(Helper.convertErrno(err.errno), err.msg))
raise
except BaseException:
print("[=] Unexpected exception")
raise
# Do this after dbConn is in proper shape
# Moved to Database()
# self._stateMachine = StateMechine(self._dbConn)
def getDbConn(self):
return self._dbConn
# TODO: not used any more, to delete
def pickAndAllocateTable(self): # pick any table, and "use" it
return self.tableNumQueue.pickAndAllocate()
# TODO: Not used any more, to delete
def addTable(self):
with self._lock:
tIndex = self.tableNumQueue.push()
return tIndex
# Not used any more, to delete
def releaseTable(self, i): # return the table back, so others can use it
self.tableNumQueue.release(i)
# TODO: not used any more, delete
def getTableNameToDelete(self):
tblNum = self.tableNumQueue.pop() # TODO: race condition!
if (not tblNum): # maybe false
return False
return "table_{}".format(tblNum)
def cleanUp(self):
self._dbConn.close()
class TaskExecutor():
class BoundedList:
def __init__(self, size=10):
......@@ -2402,7 +2008,7 @@ class ClientManager:
global gContainer
tInst = gContainer.defTdeInstance = TdeInstance() # "subdir to hold the instance"
dbManager = DbManager() # Regular function
dbManager = DbManager(gConfig.connector_type, tInst.getDbTarget()) # Regular function
thPool = ThreadPool(gConfig.num_threads, gConfig.max_steps)
self.tc = ThreadCoordinator(thPool, dbManager)
......
from __future__ import annotations
import sys
import time
import threading
import requests
from requests.auth import HTTPBasicAuth
import taos
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.log import *
from .misc import Logging, CrashGenError, Helper
# from .service_manager import TdeInstance
class DbConn:
TYPE_NATIVE = "native-c"
TYPE_REST = "rest-api"
TYPE_INVALID = "invalid"
@classmethod
def create(cls, connType, dbTarget):
if connType == cls.TYPE_NATIVE:
return DbConnNative(dbTarget)
elif connType == cls.TYPE_REST:
return DbConnRest(dbTarget)
else:
raise RuntimeError(
"Unexpected connection type: {}".format(connType))
@classmethod
def createNative(cls, dbTarget) -> DbConn:
return cls.create(cls.TYPE_NATIVE, dbTarget)
@classmethod
def createRest(cls, dbTarget) -> DbConn:
return cls.create(cls.TYPE_REST, dbTarget)
def __init__(self, dbTarget):
self.isOpen = False
self._type = self.TYPE_INVALID
self._lastSql = None
self._dbTarget = dbTarget
def getLastSql(self):
return self._lastSql
def open(self):
if (self.isOpen):
raise RuntimeError("Cannot re-open an existing DB connection")
# below implemented by child classes
self.openByType()
Logging.debug("[DB] data connection opened, type = {}".format(self._type))
self.isOpen = True
def close(self):
raise RuntimeError("Unexpected execution, should be overriden")
def queryScalar(self, sql) -> int:
return self._queryAny(sql)
def queryString(self, sql) -> str:
return self._queryAny(sql)
def _queryAny(self, sql): # actual query result as an int
if (not self.isOpen):
raise RuntimeError("Cannot query database until connection is open")
nRows = self.query(sql)
if nRows != 1:
raise taos.error.ProgrammingError(
"Unexpected result for query: {}, rows = {}".format(sql, nRows),
(0x991 if nRows==0 else 0x992)
)
if self.getResultRows() != 1 or self.getResultCols() != 1:
raise RuntimeError("Unexpected result set for query: {}".format(sql))
return self.getQueryResult()[0][0]
def use(self, dbName):
self.execute("use {}".format(dbName))
def existsDatabase(self, dbName: str):
''' Check if a certain database exists '''
self.query("show databases")
dbs = [v[0] for v in self.getQueryResult()] # ref: https://stackoverflow.com/questions/643823/python-list-transformation
# ret2 = dbName in dbs
# print("dbs = {}, str = {}, ret2={}, type2={}".format(dbs, dbName,ret2, type(dbName)))
return dbName in dbs # TODO: super weird type mangling seen, once here
def hasTables(self):
return self.query("show tables") > 0
def execute(self, sql):
''' Return the number of rows affected'''
raise RuntimeError("Unexpected execution, should be overriden")
def safeExecute(self, sql):
'''Safely execute any SQL query, returning True/False upon success/failure'''
try:
self.execute(sql)
return True # ignore num of results, return success
except taos.error.ProgrammingError as err:
return False # failed, for whatever TAOS reason
# Not possile to reach here, non-TAOS exception would have been thrown
def query(self, sql) -> int: # return num rows returned
''' Return the number of rows affected'''
raise RuntimeError("Unexpected execution, should be overriden")
def openByType(self):
raise RuntimeError("Unexpected execution, should be overriden")
def getQueryResult(self):
raise RuntimeError("Unexpected execution, should be overriden")
def getResultRows(self):
raise RuntimeError("Unexpected execution, should be overriden")
def getResultCols(self):
raise RuntimeError("Unexpected execution, should be overriden")
# Sample: curl -u root:taosdata -d "show databases" localhost:6020/rest/sql
class DbConnRest(DbConn):
REST_PORT_INCREMENT = 11
def __init__(self, dbTarget: DbTarget):
super().__init__(dbTarget)
self._type = self.TYPE_REST
restPort = dbTarget.port + 11
self._url = "http://{}:{}/rest/sql".format(
dbTarget.hostAddr, dbTarget.port + self.REST_PORT_INCREMENT)
self._result = None
def openByType(self): # Open connection
pass # do nothing, always open
def close(self):
if (not self.isOpen):
raise RuntimeError("Cannot clean up database until connection is open")
# Do nothing for REST
Logging.debug("[DB] REST Database connection closed")
self.isOpen = False
def _doSql(self, sql):
self._lastSql = sql # remember this, last SQL attempted
try:
r = requests.post(self._url,
data = sql,
auth = HTTPBasicAuth('root', 'taosdata'))
except:
print("REST API Failure (TODO: more info here)")
raise
rj = r.json()
# Sanity check for the "Json Result"
if ('status' not in rj):
raise RuntimeError("No status in REST response")
if rj['status'] == 'error': # clearly reported error
if ('code' not in rj): # error without code
raise RuntimeError("REST error return without code")
errno = rj['code'] # May need to massage this in the future
# print("Raising programming error with REST return: {}".format(rj))
raise taos.error.ProgrammingError(
rj['desc'], errno) # todo: check existance of 'desc'
if rj['status'] != 'succ': # better be this
raise RuntimeError(
"Unexpected REST return status: {}".format(
rj['status']))
nRows = rj['rows'] if ('rows' in rj) else 0
self._result = rj
return nRows
def execute(self, sql):
if (not self.isOpen):
raise RuntimeError(
"Cannot execute database commands until connection is open")
Logging.debug("[SQL-REST] Executing SQL: {}".format(sql))
nRows = self._doSql(sql)
Logging.debug(
"[SQL-REST] Execution Result, nRows = {}, SQL = {}".format(nRows, sql))
return nRows
def query(self, sql): # return rows affected
return self.execute(sql)
def getQueryResult(self):
return self._result['data']
def getResultRows(self):
print(self._result)
raise RuntimeError("TBD") # TODO: finish here to support -v under -c rest
# return self._tdSql.queryRows
def getResultCols(self):
print(self._result)
raise RuntimeError("TBD")
# Duplicate code from TDMySQL, TODO: merge all this into DbConnNative
class MyTDSql:
# Class variables
_clsLock = threading.Lock() # class wide locking
longestQuery = None # type: str
longestQueryTime = 0.0 # seconds
lqStartTime = 0.0
# lqEndTime = 0.0 # Not needed, as we have the two above already
def __init__(self, hostAddr, cfgPath):
# Make the DB connection
self._conn = taos.connect(host=hostAddr, config=cfgPath)
self._cursor = self._conn.cursor()
self.queryRows = 0
self.queryCols = 0
self.affectedRows = 0
# def init(self, cursor, log=True):
# self.cursor = cursor
# if (log):
# caller = inspect.getframeinfo(inspect.stack()[1][0])
# self.cursor.log(caller.filename + ".sql")
def close(self):
self._cursor.close() # can we double close?
self._conn.close() # TODO: very important, cursor close does NOT close DB connection!
self._cursor.close()
def _execInternal(self, sql):
startTime = time.time()
ret = self._cursor.execute(sql)
# print("\nSQL success: {}".format(sql))
queryTime = time.time() - startTime
# Record the query time
cls = self.__class__
if queryTime > (cls.longestQueryTime + 0.01) :
with cls._clsLock:
cls.longestQuery = sql
cls.longestQueryTime = queryTime
cls.lqStartTime = startTime
return ret
def query(self, sql):
self.sql = sql
try:
self._execInternal(sql)
self.queryResult = self._cursor.fetchall()
self.queryRows = len(self.queryResult)
self.queryCols = len(self._cursor.description)
except Exception as e:
# caller = inspect.getframeinfo(inspect.stack()[1][0])
# args = (caller.filename, caller.lineno, sql, repr(e))
# tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
raise
return self.queryRows
def execute(self, sql):
self.sql = sql
try:
self.affectedRows = self._execInternal(sql)
except Exception as e:
# caller = inspect.getframeinfo(inspect.stack()[1][0])
# args = (caller.filename, caller.lineno, sql, repr(e))
# tdLog.exit("%s(%d) failed: sql:%s, %s" % args)
raise
return self.affectedRows
class DbTarget:
def __init__(self, cfgPath, hostAddr, port):
self.cfgPath = cfgPath
self.hostAddr = hostAddr
self.port = port
def __repr__(self):
return "[DbTarget: cfgPath={}, host={}:{}]".format(
self.cfgPath, self.hostAddr, self.port)
class DbConnNative(DbConn):
# Class variables
_lock = threading.Lock()
_connInfoDisplayed = False
totalConnections = 0 # Not private
def __init__(self, dbTarget):
super().__init__(dbTarget)
self._type = self.TYPE_NATIVE
self._conn = None
# self._cursor = None
def openByType(self): # Open connection
# global gContainer
# tInst = tInst or gContainer.defTdeInstance # set up in ClientManager, type: TdeInstance
# cfgPath = self.getBuildPath() + "/test/cfg"
# cfgPath = tInst.getCfgDir()
# hostAddr = tInst.getHostAddr()
cls = self.__class__ # Get the class, to access class variables
with cls._lock: # force single threading for opening DB connections. # TODO: whaaat??!!!
dbTarget = self._dbTarget
if not cls._connInfoDisplayed:
cls._connInfoDisplayed = True # updating CLASS variable
Logging.info("Initiating TAOS native connection to {}".format(dbTarget))
# Make the connection
# self._conn = taos.connect(host=hostAddr, config=cfgPath) # TODO: make configurable
# self._cursor = self._conn.cursor()
# Record the count in the class
self._tdSql = MyTDSql(dbTarget.hostAddr, dbTarget.cfgPath) # making DB connection
cls.totalConnections += 1
self._tdSql.execute('reset query cache')
# self._cursor.execute('use db') # do this at the beginning of every
# Open connection
# self._tdSql = MyTDSql()
# self._tdSql.init(self._cursor)
def close(self):
if (not self.isOpen):
raise RuntimeError("Cannot clean up database until connection is open")
self._tdSql.close()
# Decrement the class wide counter
cls = self.__class__ # Get the class, to access class variables
with cls._lock:
cls.totalConnections -= 1
Logging.debug("[DB] Database connection closed")
self.isOpen = False
def execute(self, sql):
if (not self.isOpen):
raise RuntimeError("Cannot execute database commands until connection is open")
Logging.debug("[SQL] Executing SQL: {}".format(sql))
self._lastSql = sql
nRows = self._tdSql.execute(sql)
Logging.debug(
"[SQL] Execution Result, nRows = {}, SQL = {}".format(
nRows, sql))
return nRows
def query(self, sql): # return rows affected
if (not self.isOpen):
raise RuntimeError(
"Cannot query database until connection is open")
Logging.debug("[SQL] Executing SQL: {}".format(sql))
self._lastSql = sql
nRows = self._tdSql.query(sql)
Logging.debug(
"[SQL] Query Result, nRows = {}, SQL = {}".format(
nRows, sql))
return nRows
# results are in: return self._tdSql.queryResult
def getQueryResult(self):
return self._tdSql.queryResult
def getResultRows(self):
return self._tdSql.queryRows
def getResultCols(self):
return self._tdSql.queryCols
class DbManager():
''' This is a wrapper around DbConn(), to make it easier to use.
TODO: rename this to DbConnManager
'''
def __init__(self, cType, dbTarget):
# self.tableNumQueue = LinearQueue() # TODO: delete?
# self.openDbServerConnection()
self._dbConn = DbConn.createNative(dbTarget) if (
cType == 'native') else DbConn.createRest(dbTarget)
try:
self._dbConn.open() # may throw taos.error.ProgrammingError: disconnected
except taos.error.ProgrammingError as err:
# print("Error type: {}, msg: {}, value: {}".format(type(err), err.msg, err))
if (err.msg == 'client disconnected'): # cannot open DB connection
print(
"Cannot establish DB connection, please re-run script without parameter, and follow the instructions.")
sys.exit(2)
else:
print("Failed to connect to DB, errno = {}, msg: {}"
.format(Helper.convertErrno(err.errno), err.msg))
raise
except BaseException:
print("[=] Unexpected exception")
raise
# Do this after dbConn is in proper shape
# Moved to Database()
# self._stateMachine = StateMechine(self._dbConn)
def getDbConn(self):
return self._dbConn
# TODO: not used any more, to delete
def pickAndAllocateTable(self): # pick any table, and "use" it
return self.tableNumQueue.pickAndAllocate()
# TODO: Not used any more, to delete
def addTable(self):
with self._lock:
tIndex = self.tableNumQueue.push()
return tIndex
# Not used any more, to delete
def releaseTable(self, i): # return the table back, so others can use it
self.tableNumQueue.release(i)
# TODO: not used any more, delete
def getTableNameToDelete(self):
tblNum = self.tableNumQueue.pop() # TODO: race condition!
if (not tblNum): # maybe false
return False
return "table_{}".format(tblNum)
def cleanUp(self):
self._dbConn.close()
......@@ -16,7 +16,9 @@ except:
sys.exit(-1)
from queue import Queue, Empty
from .misc import Logging, Status, CrashGenError, Dice
from .db import DbConn, DbTarget
class TdeInstance():
"""
......@@ -45,9 +47,17 @@ class TdeInstance():
.format(selfPath, projPath))
return buildPath
def __init__(self, subdir='test'):
self._buildDir = self._getBuildPath()
self._subdir = '/' + subdir # TODO: tolerate "/"
def __init__(self, subdir='test', port=6030, fepPort=6030):
self._buildDir = self._getBuildPath()
self._subdir = '/' + subdir # TODO: tolerate "/"
self._port = port # TODO: support different IP address too
self._fepPort = fepPort
def getDbTarget(self):
return DbTarget(self.getCfgDir(), self.getHostAddr(), self._port)
def getPort(self):
return self._port
def __repr__(self):
return "[TdeInstance: {}, subdir={}]".format(self._buildDir, self._subdir)
......@@ -74,9 +84,10 @@ class TdeInstance():
os.makedirs(cfgDir, exist_ok=True) # like "mkdir -p"
# Now we have a good cfg dir
cfgValues = {
'runDir': self.getRunDir(),
'ip': '127.0.0.1', # TODO: change to a network addressable ip
'port': 6030,
'runDir': self.getRunDir(),
'ip': '127.0.0.1', # TODO: change to a network addressable ip
'port': self._port,
'fepPort': self._fepPort,
}
cfgTemplate = """
dataDir {runDir}/data
......@@ -84,7 +95,7 @@ logDir {runDir}/log
charset UTF-8
firstEp {ip}:{port}
firstEp {ip}:{fepPort}
fqdn {ip}
serverPort {port}
......@@ -236,9 +247,10 @@ class TdeSubProcess:
class ServiceManager:
PAUSE_BETWEEN_IPC_CHECK = 1.2 # seconds between checks on STDOUT of sub process
def __init__(self, numDnodes = 1):
def __init__(self, numDnodes = 1): # Otherwise we run a cluster
Logging.info("TDengine Service Manager (TSM) created")
self._numDnodes = numDnodes # >1 means we have a cluster
self._lock = threading.Lock()
# signal.signal(signal.SIGTERM, self.sigIntHandler) # Moved to MainExec
# signal.signal(signal.SIGINT, self.sigIntHandler)
# signal.signal(signal.SIGUSR1, self.sigUsrHandler) # different handler!
......@@ -246,12 +258,20 @@ class ServiceManager:
self.inSigHandler = False
# self._status = MainExec.STATUS_RUNNING # set inside
# _startTaosService()
self._runCluster = (numDnodes >= 1)
self.svcMgrThreads = [] # type: List[ServiceManagerThread]
for i in range(0, numDnodes):
self.svcMgrThreads.append(ServiceManagerThread(i))
self._lock = threading.Lock()
# self._isRestarting = False
def _createThread(self, dnIndex):
if not self._runCluster: # single instance
return ServiceManagerThread(0)
# Create all threads in a cluster
subdir = 'cluster_dnode_{}'.format(dnIndex)
fepPort= 6030 # firstEP Port
port = fepPort + dnIndex * 100
ti = TdeInstance(subdir, port, fepPort)
return ServiceManagerThread(dnIndex, ti)
def _doMenu(self):
choice = ""
......@@ -488,11 +508,33 @@ class ServiceManagerThread:
if self._status == Status.STATUS_RUNNING:
Logging.info("[] TDengine service READY to process requests")
Logging.info("[] TAOS service started: {}".format(self))
self._verifyDnode(self._tInst) # query and ensure dnode is ready
return # now we've started
# TODO: handle failure-to-start better?
self.procIpcBatch(100, True) # display output before cronking out, trim to last 20 msgs, force output
raise RuntimeError("TDengine service did not start successfully: {}".format(self))
def _verifyDnode(self, tInst: TdeInstance):
dbc = DbConn.createNative(tInst.getDbTarget())
dbc.open()
dbc.query("show dnodes")
# dbc.query("DESCRIBE {}.{}".format(dbName, self._stName))
cols = dbc.getQueryResult() # id,end_point,vnodes,cores,status,role,create_time,offline reason
# ret = {row[0]:row[1] for row in stCols if row[3]=='TAG'} # name:type
isValid = False
for col in cols:
print("col = {}".format(col))
ep = col[1].split(':') # 10.1.30.2:6030
print("ep={}".format(ep))
if tInst.getPort() == int(ep[1]): # That's us
print("Valid Dnode matched!")
isValid = True # now we are valid
break
if not isValid:
raise RuntimeError("Failed to start Dnode, port = {}, expected: {}".
format(ep[1], tInst.getPort()))
dbc.close()
def stop(self):
# can be called from both main thread or signal handler
print("Terminating TDengine service running as the sub process...")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册