提交 39d6b3d9 编写于 作者: H Hui Li

Merge branch 'develop' into hotfix/test

...@@ -18,7 +18,7 @@ import ( ...@@ -18,7 +18,7 @@ import (
"sync" "sync"
"time" "time"
_ "github.com/taosdata/TDengine/src/connector/go/taosSql" _ "github.com/taosdata/driver-go/taosSql"
) )
const ( const (
......
...@@ -402,6 +402,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -402,6 +402,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
void *pVnode = vnodeAcquireVnode(pCreate->cfg.vgId); void *pVnode = vnodeAcquireVnode(pCreate->cfg.vgId);
if (pVnode != NULL) { if (pVnode != NULL) {
dDebug("vgId:%d, already exist, processed as alter msg", pCreate->cfg.vgId);
int32_t code = vnodeAlter(pVnode, pCreate); int32_t code = vnodeAlter(pVnode, pCreate);
vnodeRelease(pVnode); vnodeRelease(pVnode);
return code; return code;
......
...@@ -129,9 +129,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_CLUSTER_CFG_INCONSISTENT, 0, 0x0335, "mnode clus ...@@ -129,9 +129,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_CLUSTER_CFG_INCONSISTENT, 0, 0x0335, "mnode clus
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACCT_ALREADY_EXIST, 0, 0x0340, "mnode accounts already exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACCT_ALREADY_EXIST, 0, 0x0340, "mnode accounts already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ACCT, 0, 0x0341, "mnode invalid account") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ACCT, 0, 0x0341, "mnode invalid account")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ACCT_PARA, 0, 0x0342, "mnode invalid account parameter") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ACCT_OPTION, 0, 0x0342, "mnode invalid acct option")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ACCT_OPTION, 0, 0x0343, "mnode invalid acct option")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_ACCTS, 0, 0x0344, "mnode too many accounts")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_USER_ALREADY_EXIST, 0, 0x0350, "mnode user already exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_USER_ALREADY_EXIST, 0, 0x0350, "mnode user already exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_USER, 0, 0x0351, "mnode invalid user") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_USER, 0, 0x0351, "mnode invalid user")
...@@ -145,7 +143,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_ID, 0, 0x0361, "mnode inva ...@@ -145,7 +143,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_ID, 0, 0x0361, "mnode inva
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_NAME, 0, 0x0362, "mnode invalid table name") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_NAME, 0, 0x0362, "mnode invalid table name")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_TYPE, 0, 0x0363, "mnode invalid table type") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TABLE_TYPE, 0, 0x0363, "mnode invalid table type")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TAGS, 0, 0x0364, "mnode too many tags") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TAGS, 0, 0x0364, "mnode too many tags")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TABLES, 0, 0x0365, "mnode too many tables")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TIMESERIES, 0, 0x0366, "mnode not enough time series") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_TIMESERIES, 0, 0x0366, "mnode not enough time series")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_SUPER_TABLE, 0, 0x0367, "mnode no super table") // operation only available for super table TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_SUPER_TABLE, 0, 0x0367, "mnode no super table") // operation only available for super table
TAOS_DEFINE_ERROR(TSDB_CODE_MND_COL_NAME_TOO_LONG, 0, 0x0368, "mnode column name too long") TAOS_DEFINE_ERROR(TSDB_CODE_MND_COL_NAME_TOO_LONG, 0, 0x0368, "mnode column name too long")
...@@ -167,7 +164,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DND_MSG_NOT_PROCESSED, 0, 0x0400, "dnode mess ...@@ -167,7 +164,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DND_MSG_NOT_PROCESSED, 0, 0x0400, "dnode mess
TAOS_DEFINE_ERROR(TSDB_CODE_DND_OUT_OF_MEMORY, 0, 0x0401, "dnode out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_DND_OUT_OF_MEMORY, 0, 0x0401, "dnode out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_NO_WRITE_ACCESS, 0, 0x0402, "dnode no disk write access") TAOS_DEFINE_ERROR(TSDB_CODE_DND_NO_WRITE_ACCESS, 0, 0x0402, "dnode no disk write access")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_MSG_LEN, 0, 0x0403, "dnode invalid message length") TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_MSG_LEN, 0, 0x0403, "dnode invalid message length")
TAOS_DEFINE_ERROR(TSDB_CODE_DND_INVALID_FILE_FORMAT, 0, 0x0404, "dnode invalid file format")
// vnode // vnode
TAOS_DEFINE_ERROR(TSDB_CODE_VND_ACTION_IN_PROGRESS, 0, 0x0500, "vnode action in progress") TAOS_DEFINE_ERROR(TSDB_CODE_VND_ACTION_IN_PROGRESS, 0, 0x0500, "vnode action in progress")
......
...@@ -128,6 +128,7 @@ int32_t mnodeInitAccts() { ...@@ -128,6 +128,7 @@ int32_t mnodeInitAccts() {
void mnodeCleanupAccts() { void mnodeCleanupAccts() {
acctCleanUp(); acctCleanUp();
sdbCloseTable(tsAcctSdb); sdbCloseTable(tsAcctSdb);
tsAcctSdb = NULL;
} }
void *mnodeGetAcct(char *name) { void *mnodeGetAcct(char *name) {
......
...@@ -459,6 +459,7 @@ void mnodeMoveVgroupToHead(SVgObj *pVgroup) { ...@@ -459,6 +459,7 @@ void mnodeMoveVgroupToHead(SVgObj *pVgroup) {
void mnodeCleanupDbs() { void mnodeCleanupDbs() {
sdbCloseTable(tsDbSdb); sdbCloseTable(tsDbSdb);
tsDbSdb = NULL;
} }
static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
......
...@@ -176,6 +176,7 @@ int32_t mnodeInitDnodes() { ...@@ -176,6 +176,7 @@ int32_t mnodeInitDnodes() {
void mnodeCleanupDnodes() { void mnodeCleanupDnodes() {
sdbCloseTable(tsDnodeSdb); sdbCloseTable(tsDnodeSdb);
tsDnodeSdb = NULL;
} }
void *mnodeGetNextDnode(void *pIter, SDnodeObj **pDnode) { void *mnodeGetNextDnode(void *pIter, SDnodeObj **pDnode) {
......
...@@ -121,9 +121,9 @@ void mnodeCleanupSystem() { ...@@ -121,9 +121,9 @@ void mnodeCleanupSystem() {
dnodeFreeMnodeWqueue(); dnodeFreeMnodeWqueue();
dnodeFreeMnodeRqueue(); dnodeFreeMnodeRqueue();
dnodeFreeMnodePqueue(); dnodeFreeMnodePqueue();
mnodeCleanupComponents(sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]) - 1);
mnodeCleanupTimer(); mnodeCleanupTimer();
mnodeCleanupComponents(sizeof(tsMnodeComponents) / sizeof(tsMnodeComponents[0]) - 1);
mInfo("mnode is cleaned up"); mInfo("mnode is cleaned up");
} }
......
...@@ -165,6 +165,7 @@ int32_t mnodeInitMnodes() { ...@@ -165,6 +165,7 @@ int32_t mnodeInitMnodes() {
void mnodeCleanupMnodes() { void mnodeCleanupMnodes() {
sdbCloseTable(tsMnodeSdb); sdbCloseTable(tsMnodeSdb);
tsMnodeSdb = NULL;
mnodeMnodeDestroyLock(); mnodeMnodeDestroyLock();
} }
......
...@@ -376,6 +376,7 @@ static int32_t mnodeInitChildTables() { ...@@ -376,6 +376,7 @@ static int32_t mnodeInitChildTables() {
static void mnodeCleanupChildTables() { static void mnodeCleanupChildTables() {
sdbCloseTable(tsChildTableSdb); sdbCloseTable(tsChildTableSdb);
tsChildTableSdb = NULL;
} }
static void mnodeAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCtable) { static void mnodeAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCtable) {
...@@ -554,6 +555,7 @@ static int32_t mnodeInitSuperTables() { ...@@ -554,6 +555,7 @@ static int32_t mnodeInitSuperTables() {
static void mnodeCleanupSuperTables() { static void mnodeCleanupSuperTables() {
sdbCloseTable(tsSuperTableSdb); sdbCloseTable(tsSuperTableSdb);
tsSuperTableSdb = NULL;
} }
int32_t mnodeInitTables() { int32_t mnodeInitTables() {
......
...@@ -154,6 +154,7 @@ int32_t mnodeInitUsers() { ...@@ -154,6 +154,7 @@ int32_t mnodeInitUsers() {
void mnodeCleanupUsers() { void mnodeCleanupUsers() {
sdbCloseTable(tsUserSdb); sdbCloseTable(tsUserSdb);
tsUserSdb = NULL;
} }
SUserObj *mnodeGetUser(char *name) { SUserObj *mnodeGetUser(char *name) {
......
...@@ -427,6 +427,7 @@ void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle) { ...@@ -427,6 +427,7 @@ void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle) {
void mnodeCleanupVgroups() { void mnodeCleanupVgroups() {
sdbCloseTable(tsVgroupSdb); sdbCloseTable(tsVgroupSdb);
tsVgroupSdb = NULL;
} }
int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
...@@ -696,9 +697,9 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { ...@@ -696,9 +697,9 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
if (rpcMsg->ahandle == NULL) return; if (rpcMsg->ahandle == NULL) return;
SMnodeMsg *mnodeMsg = rpcMsg->ahandle; SMnodeMsg *mnodeMsg = rpcMsg->ahandle;
mnodeMsg->received++; atomic_add_fetch_8(&mnodeMsg->received, 1);
if (rpcMsg->code == TSDB_CODE_SUCCESS) { if (rpcMsg->code == TSDB_CODE_SUCCESS) {
mnodeMsg->successed++; atomic_add_fetch_8(&mnodeMsg->successed, 1);
} else { } else {
mnodeMsg->code = rpcMsg->code; mnodeMsg->code = rpcMsg->code;
} }
......
...@@ -93,16 +93,18 @@ void taosCloseQueue(taos_queue param) { ...@@ -93,16 +93,18 @@ void taosCloseQueue(taos_queue param) {
void *taosAllocateQitem(int size) { void *taosAllocateQitem(int size) {
STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1); STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1);
if (pNode == NULL) return NULL; if (pNode == NULL) return NULL;
uTrace("item:%p, node:%p is allocated", pNode->item, pNode);
return (void *)pNode->item; return (void *)pNode->item;
} }
void taosFreeQitem(void *param) { void taosFreeQitem(void *param) {
if (param == NULL) return; if (param == NULL) return;
uTrace("item:%p is freed", param);
char *temp = (char *)param; char *temp = (char *)param;
temp -= sizeof(STaosQnode); temp -= sizeof(STaosQnode);
uTrace("item:%p, node:%p is freed", param, temp);
free(temp); free(temp);
} }
......
...@@ -498,7 +498,17 @@ void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode) { ...@@ -498,7 +498,17 @@ void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode) {
if (pSkipList->lock) { if (pSkipList->lock) {
pthread_rwlock_wrlock(pSkipList->lock); pthread_rwlock_wrlock(pSkipList->lock);
} }
if (pSkipList->size == 1) {
assert(pSkipList->lastKey == SL_GET_NODE_KEY(pSkipList, pNode));
pSkipList->lastKey = 0;
} else {
if (pSkipList->lastKey == SL_GET_NODE_KEY(pSkipList, pNode)) {
SSkipListNode* prev = SL_GET_BACKWARD_POINTER(pNode, 0);
pSkipList->lastKey = SL_GET_NODE_KEY(pSkipList, prev);
}
}
for (int32_t j = level - 1; j >= 0; --j) { for (int32_t j = level - 1; j >= 0; --j) {
SSkipListNode* prev = SL_GET_BACKWARD_POINTER(pNode, j); SSkipListNode* prev = SL_GET_BACKWARD_POINTER(pNode, j);
SSkipListNode* next = SL_GET_FORWARD_POINTER(pNode, j); SSkipListNode* next = SL_GET_FORWARD_POINTER(pNode, j);
......
...@@ -161,11 +161,17 @@ int32_t vnodeDrop(int32_t vgId) { ...@@ -161,11 +161,17 @@ int32_t vnodeDrop(int32_t vgId) {
int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) { int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) {
SVnodeObj *pVnode = param; SVnodeObj *pVnode = param;
if (pVnode->status != TAOS_VN_STATUS_READY) // vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
return TSDB_CODE_VND_INVALID_STATUS; // cfgVersion can be corrected by status msg
if (pVnode->status != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId);
return TSDB_CODE_SUCCESS;
}
if (pVnode->syncCfg.replica > 1 && pVnode->role == TAOS_SYNC_ROLE_UNSYNCED) // the vnode may always fail to synchronize because of it in low cfgVersion
return TSDB_CODE_VND_NOT_SYNCED; // so cannot use the following codes
// if (pVnode->syncCfg.replica > 1 && pVnode->role == TAOS_SYNC_ROLE_UNSYNCED)
// return TSDB_CODE_VND_NOT_SYNCED;
pVnode->status = TAOS_VN_STATUS_UPDATING; pVnode->status = TAOS_VN_STATUS_UPDATING;
......
...@@ -49,19 +49,26 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { ...@@ -49,19 +49,26 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
SVnodeObj *pVnode = (SVnodeObj *)param1; SVnodeObj *pVnode = (SVnodeObj *)param1;
SWalHead *pHead = param2; SWalHead *pHead = param2;
if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL) {
vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[pHead->msgType]);
return TSDB_CODE_VND_MSG_NOT_PROCESSED; return TSDB_CODE_VND_MSG_NOT_PROCESSED;
}
if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) { if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
vDebug("vgId:%d, msgType:%s not processed, no write auth", pVnode->vgId, taosMsg[pHead->msgType]);
return TSDB_CODE_VND_NO_WRITE_AUTH; return TSDB_CODE_VND_NO_WRITE_AUTH;
} }
if (pHead->version == 0) { // from client or CQ if (pHead->version == 0) { // from client or CQ
if (pVnode->status != TAOS_VN_STATUS_READY) if (pVnode->status != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->status);
return TSDB_CODE_VND_INVALID_STATUS; // it may be in deleting or closing state return TSDB_CODE_VND_INVALID_STATUS; // it may be in deleting or closing state
}
if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->syncCfg.replica, pVnode->role);
return TSDB_CODE_RPC_NOT_READY; return TSDB_CODE_RPC_NOT_READY;
}
// assign version // assign version
pVnode->version++; pVnode->version++;
......
...@@ -1213,7 +1213,11 @@ class Task(): ...@@ -1213,7 +1213,11 @@ class Task():
self._executeInternal(te, wt) # TODO: no return value? self._executeInternal(te, wt) # TODO: no return value?
except taos.error.ProgrammingError as err: except taos.error.ProgrammingError as err:
errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme errno2 = err.errno if (err.errno > 0) else 0x80000000 + err.errno # correct error scheme
if ( errno2 in [0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D, 0x381, 0x380, 0x383, 0x503, 0x600, if ( errno2 in [
0x05, # TSDB_CODE_RPC_NOT_READY
0x200, 0x360, 0x362, 0x36A, 0x36B, 0x36D, 0x381, 0x380, 0x383, 0x503,
0x510, # vnode not in ready state
0x600,
1000 # REST catch-all error 1000 # REST catch-all error
]) : # allowed errors ]) : # allowed errors
self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql)) self.logDebug("[=] Acceptable Taos library exception: errno=0x{:X}, msg: {}, SQL: {}".format(errno2, err, self._lastSql))
...@@ -1629,73 +1633,219 @@ class MyLoggingAdapter(logging.LoggerAdapter): ...@@ -1629,73 +1633,219 @@ class MyLoggingAdapter(logging.LoggerAdapter):
# return '[%s] %s' % (self.extra['connid'], msg), kwargs # return '[%s] %s' % (self.extra['connid'], msg), kwargs
class SvcManager: class SvcManager:
MAX_QUEUE_SIZE = 10000
def __init__(self): def __init__(self):
print("Starting service manager") print("Starting service manager")
signal.signal(signal.SIGTERM, self.sigIntHandler) signal.signal(signal.SIGTERM, self.sigIntHandler)
signal.signal(signal.SIGINT, self.sigIntHandler) signal.signal(signal.SIGINT, self.sigIntHandler)
signal.signal(signal.SIGUSR1, self.sigUsrHandler)
self.ioThread = None self.ioThread = None
self.subProcess = None self.subProcess = None
self.shouldStop = False self.shouldStop = False
self.status = MainExec.STATUS_RUNNING # self.status = MainExec.STATUS_RUNNING # set inside _startTaosService()
def svcOutputReader(self, out: IO, queue): def svcOutputReader(self, out: IO, queue):
# print("This is the svcOutput Reader...") # Important Reference: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
for line in out : # iter(out.readline, b''): print("This is the svcOutput Reader...")
# for line in out :
for line in iter(out.readline, b''):
# print("Finished reading a line: {}".format(line)) # print("Finished reading a line: {}".format(line))
queue.put(line.rstrip()) # get rid of new lines # print("Adding item to queue...")
print("No more output from incoming IO") # meaning sub process must have died line = line.decode("utf-8").rstrip()
queue.put(line) # This might block, and then causing "out" buffer to block
print("_i", end="", flush=True)
# Trim the queue if necessary
oneTenthQSize = self.MAX_QUEUE_SIZE // 10
if (queue.qsize() >= (self.MAX_QUEUE_SIZE - oneTenthQSize) ) : # 90% full?
print("Triming IPC queue by: {}".format(oneTenthQSize))
for i in range(0, oneTenthQSize) :
try:
queue.get_nowait()
except Empty:
break # break out of for loop, no more trimming
if self.shouldStop :
print("Stopping to read output from sub process")
break
# queue.put(line)
print("\nNo more output (most likely) from IO thread managing TDengine service") # meaning sub process must have died
out.close() out.close()
def sigIntHandler(self, signalNumber, frame): def _doMenu(self):
choice = ""
while True:
print("\nInterrupting Service Program, Choose an Action: ")
print("1: Resume")
print("2: Terminate")
print("3: Restart")
# Remember to update the if range below
# print("Enter Choice: ", end="", flush=True)
while choice == "":
choice = input("Enter Choice: ")
if choice != "":
break # done with reading repeated input
if choice in ["1", "2", "3"]:
break # we are done with whole method
print("Invalid choice, please try again.")
choice = "" # reset
return choice
def sigUsrHandler(self, signalNumber, frame) :
print("Interrupting main thread execution upon SIGUSR1")
if self.status != MainExec.STATUS_RUNNING : if self.status != MainExec.STATUS_RUNNING :
print("Ignoring repeated SIGINT...") print("Ignoring repeated SIG...")
return # do nothing if it's already not running return # do nothing if it's already not running
self.status = MainExec.STATUS_STOPPING # immediately set our status self.status = MainExec.STATUS_STOPPING
choice = self._doMenu()
if choice == "1" :
self.sigHandlerResume() # TODO: can the sub-process be blocked due to us not reading from queue?
elif choice == "2" :
self.stopTaosService()
elif choice == "3" :
self.stopTaosService()
self.startTaosService()
else:
raise RuntimeError("Invalid menu choice: {}".format(choice))
print("Terminating program...") def sigIntHandler(self, signalNumber, frame):
self.subProcess.send_signal(signal.SIGINT) print("Sig INT Handler starting...")
self.shouldStop = True if self.status != MainExec.STATUS_RUNNING :
self.joinIoThread() print("Ignoring repeated SIG_INT...")
return
self.status = MainExec.STATUS_STOPPING # immediately set our status
self.stopTaosService()
print("INT signal handler returning...")
def sigHandlerResume(self) :
print("Resuming TDengine service manager thread (main thread)...\n\n")
self.status = MainExec.STATUS_RUNNING
def joinIoThread(self): def joinIoThread(self):
if self.ioThread : if self.ioThread :
self.ioThread.join() self.ioThread.join()
self.ioThread = None self.ioThread = None
else :
print("Joining empty thread, doing nothing")
TD_READY_MSG = "TDengine is initialized successfully"
def _procIpcBatch(self):
# Process all the output generated by the underlying sub process, managed by IO thread
while True :
try:
line = self.ipcQueue.get_nowait() # getting output at fast speed
print("_o", end="", flush=True)
if self.status == MainExec.STATUS_STARTING : # we are starting, let's see if we have started
if line.find(self.TD_READY_MSG) != -1 : # found
self.status = MainExec.STATUS_RUNNING
except Empty:
# time.sleep(2.3) # wait only if there's no output
# no more output
return # we are done with THIS BATCH
else: # got line
print(line)
def run(self): def _procIpcAll(self):
while True :
print("<", end="", flush=True)
self._procIpcBatch() # process one batch
# check if the ioThread is still running
if (not self.ioThread) or (not self.ioThread.is_alive()):
print("IO Thread (with subprocess) has ended, main thread now exiting...")
self.stopTaosService()
self._procIpcBatch() # one more batch
return # TODO: maybe one last batch?
# Maybe handler says we should exit now
if self.shouldStop:
print("Main thread ending all IPC processing with IOThread/SubProcess")
self._procIpcBatch() # one more batch
return
print(">", end="", flush=True)
time.sleep(0.5)
def startTaosService(self):
ON_POSIX = 'posix' in sys.builtin_module_names ON_POSIX = 'posix' in sys.builtin_module_names
svcCmd = ['../../build/build/bin/taosd', '-c', '../../build/test/cfg'] svcCmd = ['../../build/build/bin/taosd', '-c', '../../build/test/cfg']
# svcCmd = ['vmstat', '1'] # svcCmd = ['vmstat', '1']
self.subProcess = subprocess.Popen(svcCmd, stdout=subprocess.PIPE, bufsize=1, close_fds=ON_POSIX, text=True) if self.subProcess : # already there
q = Queue() raise RuntimeError("Corrupt process state")
self.ioThread = threading.Thread(target=self.svcOutputReader, args=(self.subProcess.stdout, q))
self.subProcess = subprocess.Popen(
svcCmd,
stdout=subprocess.PIPE,
# bufsize=1, # not supported in binary mode
close_fds=ON_POSIX) # had text=True, which interferred with reading EOF
self.ipcQueue = Queue()
if self.ioThread :
raise RuntimeError("Corrupt thread state")
self.ioThread = threading.Thread(target=self.svcOutputReader, args=(self.subProcess.stdout, self.ipcQueue))
self.ioThread.daemon = True # thread dies with the program self.ioThread.daemon = True # thread dies with the program
self.ioThread.start() self.ioThread.start()
self.shouldStop = False # don't let the main loop stop
self.status = MainExec.STATUS_STARTING
# wait for service to start
for i in range(0, 10) :
time.sleep(1.0)
self._procIpcBatch() # pump messages
print("_zz_", end="", flush=True)
if self.status == MainExec.STATUS_RUNNING :
print("TDengine service READY to process requests")
return # now we've started
raise RuntimeError("TDengine service did not start successfully") # TODO: handle this better?
def stopTaosService(self):
# can be called from both main thread or signal handler
print("Terminating TDengine service running as the sub process...")
# Linux will send Control-C generated SIGINT to the TDengine process already, ref: https://unix.stackexchange.com/questions/176235/fork-and-how-signals-are-delivered-to-processes
if not self.subProcess :
print("Process already stopped")
return
retCode = self.subProcess.poll()
if retCode : # valid return code, process ended
self.subProcess = None
else: # process still alive, let's interrupt it
print("Sub process still running, sending SIG_INT and waiting for it to stop...")
self.subProcess.send_signal(signal.SIGINT) # sub process should end, then IPC queue should end, causing IO thread to end
try :
self.subProcess.wait(10)
except subprocess.TimeoutExpired as err:
print("Time out waiting for TDengine service process to exit")
else:
print("TDengine service process terminated successfully from SIG_INT")
self.subProcess = None
if self.subProcess and (not self.subProcess.poll()):
print("Sub process is still running... pid = {}".format(self.subProcess.pid))
self.shouldStop = True
self.joinIoThread()
def run(self):
self.startTaosService()
# proc = subprocess.Popen(['echo', '"to stdout"'], # proc = subprocess.Popen(['echo', '"to stdout"'],
# stdout=subprocess.PIPE, # stdout=subprocess.PIPE,
# ) # )
# stdout_value = proc.communicate()[0] # stdout_value = proc.communicate()[0]
# print('\tstdout: {}'.format(repr(stdout_value))) # print('\tstdout: {}'.format(repr(stdout_value)))
while True : self._procIpcAll()
try:
line = q.get_nowait() # getting output at fast speed
except Empty:
# print('no output yet')
time.sleep(2.3) # wait only if there's no output
else: # got line
print(line)
# print("----end of iteration----")
if self.shouldStop:
print("Ending main Svc thread")
break
print("end of loop") print("End of loop reading from IPC queue")
self.joinIoThread() # should have started already
self.joinIoThread() print("SvcManager Run Finished")
print("Finished")
class ClientManager: class ClientManager:
def __init__(self): def __init__(self):
...@@ -1747,6 +1897,10 @@ class ClientManager: ...@@ -1747,6 +1897,10 @@ class ClientManager:
self._printLastNumbers() self._printLastNumbers()
def run(self): def run(self):
if gConfig.auto_start_service :
svcMgr = SvcManager()
svcMgr.startTaosService()
self._printLastNumbers() self._printLastNumbers()
dbManager = DbManager() # Regular function dbManager = DbManager() # Regular function
...@@ -1758,6 +1912,8 @@ class ClientManager: ...@@ -1758,6 +1912,8 @@ class ClientManager:
# print("exec stats: {}".format(self.tc.getExecStats())) # print("exec stats: {}".format(self.tc.getExecStats()))
# print("TC failed = {}".format(self.tc.isFailed())) # print("TC failed = {}".format(self.tc.isFailed()))
self.conclude() self.conclude()
if gConfig.auto_start_service :
svcMgr.stopTaosService()
# print("TC failed (2) = {}".format(self.tc.isFailed())) # print("TC failed (2) = {}".format(self.tc.isFailed()))
return 1 if self.tc.isFailed() else 0 # Linux return code: ref https://shapeshed.com/unix-exit-codes/ return 1 if self.tc.isFailed() else 0 # Linux return code: ref https://shapeshed.com/unix-exit-codes/
...@@ -1767,8 +1923,9 @@ class ClientManager: ...@@ -1767,8 +1923,9 @@ class ClientManager:
class MainExec: class MainExec:
STATUS_RUNNING = 1 STATUS_STARTING = 1
STATUS_STOPPING = 2 STATUS_RUNNING = 2
STATUS_STOPPING = 3
# STATUS_STOPPED = 3 # Not used yet # STATUS_STOPPED = 3 # Not used yet
@classmethod @classmethod
...@@ -1837,6 +1994,8 @@ def main(): ...@@ -1837,6 +1994,8 @@ def main():
''')) '''))
parser.add_argument('-a', '--auto-start-service', action='store_true',
help='Automatically start/stop the TDengine service (default: false)')
parser.add_argument('-c', '--connector-type', action='store', default='native', type=str, parser.add_argument('-c', '--connector-type', action='store', default='native', type=str,
help='Connector type to use: native, rest, or mixed (default: 10)') help='Connector type to use: native, rest, or mixed (default: 10)')
parser.add_argument('-d', '--debug', action='store_true', parser.add_argument('-d', '--debug', action='store_true',
......
...@@ -190,32 +190,31 @@ class TDDnode: ...@@ -190,32 +190,31 @@ class TDDnode:
"dnode:%d is deployed and configured by %s" % "dnode:%d is deployed and configured by %s" %
(self.index, self.cfgPath)) (self.index, self.cfgPath))
def start(self): def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__)) selfPath = os.path.dirname(os.path.realpath(__file__))
binPath = ""
if ("community" in selfPath): if ("community" in selfPath):
projPath = selfPath + "/../../../../" projPath = selfPath[:selfPath.find("community")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
binPath = os.path.join(root, "taosd")
break
else: else:
projPath = selfPath + "/../../../" projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files): for root, dirs, files in os.walk(projPath):
rootRealPath = os.path.dirname(os.path.realpath(root)) if ("taosd" in files):
if ("packaging" not in rootRealPath): rootRealPath = os.path.dirname(os.path.realpath(root))
binPath = os.path.join(root, "taosd") if ("packaging" not in rootRealPath):
break buildPath = root[:len(root)-len("/build/bin")]
break
if (binPath == ""): return buildPath
def start(self):
buildPath = self.getBuildPath()
if (buildPath == ""):
tdLog.exit("taosd not found!") tdLog.exit("taosd not found!")
else: else:
tdLog.info("taosd found in %s" % rootRealPath) tdLog.info("taosd found in %s" % buildPath)
binPath = buildPath + "/build/bin/taosd"
if self.deployed == 0: if self.deployed == 0:
tdLog.exit("dnode:%d is not deployed" % (self.index)) tdLog.exit("dnode:%d is not deployed" % (self.index))
......
...@@ -68,7 +68,7 @@ endi ...@@ -68,7 +68,7 @@ endi
if $data01 != 1 then if $data01 != 1 then
return -1 return -1
endi endi
if $data02 != 1 then if $data02 != NULL then
return -1 return -1
endi endi
...@@ -80,7 +80,7 @@ endi ...@@ -80,7 +80,7 @@ endi
if $data01 != 1 then if $data01 != 1 then
return -1 return -1
endi endi
if $data02 != 1 then if $data02 != NULL then
return -1 return -1
endi endi
......
...@@ -141,9 +141,9 @@ $res = 3 * $rowNum ...@@ -141,9 +141,9 @@ $res = 3 * $rowNum
if $data00 != $res then if $data00 != $res then
return -1 return -1
endi endi
if $data01 != @18-09-17 09:00:00.000@ then #if $data01 != @18-09-17 09:00:00.000@ then
return -1 # return -1
endi #endi
if $data02 != 3 then if $data02 != 3 then
return -1 return -1
endi endi
...@@ -154,9 +154,9 @@ $res = 3 * $rowNum ...@@ -154,9 +154,9 @@ $res = 3 * $rowNum
if $data10 != $res then if $data10 != $res then
return -1 return -1
endi endi
if $data11 != @18-09-17 09:00:00.000@ then #if $data11 != @18-09-17 09:00:00.000@ then
return -1 # return -1
endi #endi
if $data15 != 2 then if $data15 != 2 then
return -1 return -1
endi endi
...@@ -223,9 +223,9 @@ $res = 3 * $rowNum ...@@ -223,9 +223,9 @@ $res = 3 * $rowNum
if $data00 != $res then if $data00 != $res then
return -1 return -1
endi endi
if $data01 != @18-09-17 09:00:00.000@ then #if $data01 != @18-09-17 09:00:00.000@ then
return -1 # return -1
endi #endi
if $data02 != 3 then if $data02 != 3 then
return -1 return -1
endi endi
...@@ -236,9 +236,9 @@ $res = 3 * $rowNum ...@@ -236,9 +236,9 @@ $res = 3 * $rowNum
if $data10 != $res then if $data10 != $res then
return -1 return -1
endi endi
if $data11 != @18-09-17 09:00:00.000@ then #if $data11 != @18-09-17 09:00:00.000@ then
return -1 # return -1
endi #endi
if $data15 != 2 then if $data15 != 2 then
return -1 return -1
endi endi
......
cd ../../../debug; cmake .. cd ../../../debug; cmake ..
cd ../../../debug; make cd ../../../debug; make
#./test.sh -f general/alter/cached_schema_after_alter.sim ./test.sh -f general/alter/cached_schema_after_alter.sim
./test.sh -f general/alter/count.sim ./test.sh -f general/alter/count.sim
./test.sh -f general/alter/import.sim ./test.sh -f general/alter/import.sim
#./test.sh -f general/alter/insert1.sim ./test.sh -f general/alter/insert1.sim
./test.sh -f general/alter/insert2.sim ./test.sh -f general/alter/insert2.sim
./test.sh -f general/alter/metrics.sim ./test.sh -f general/alter/metrics.sim
./test.sh -f general/alter/table.sim ./test.sh -f general/alter/table.sim
...@@ -117,6 +117,7 @@ cd ../../../debug; make ...@@ -117,6 +117,7 @@ cd ../../../debug; make
./test.sh -f general/parser/import_commit3.sim ./test.sh -f general/parser/import_commit3.sim
./test.sh -f general/parser/insert_tb.sim ./test.sh -f general/parser/insert_tb.sim
./test.sh -f general/parser/first_last.sim ./test.sh -f general/parser/first_last.sim
# dyh is processing this script
#./test.sh -f general/parser/import_file.sim #./test.sh -f general/parser/import_file.sim
./test.sh -f general/parser/lastrow.sim ./test.sh -f general/parser/lastrow.sim
./test.sh -f general/parser/nchar.sim ./test.sh -f general/parser/nchar.sim
...@@ -142,15 +143,12 @@ cd ../../../debug; make ...@@ -142,15 +143,12 @@ cd ../../../debug; make
./test.sh -f general/parser/tags_dynamically_specifiy.sim ./test.sh -f general/parser/tags_dynamically_specifiy.sim
./test.sh -f general/parser/groupby.sim ./test.sh -f general/parser/groupby.sim
./test.sh -f general/parser/set_tag_vals.sim ./test.sh -f general/parser/set_tag_vals.sim
#./test.sh -f general/parser/slimit_alter_tags.sim ./test.sh -f general/parser/slimit_alter_tags.sim
./test.sh -f general/parser/join.sim ./test.sh -f general/parser/join.sim
./test.sh -f general/parser/join_multivnode.sim ./test.sh -f general/parser/join_multivnode.sim
./test.sh -f general/parser/binary_escapeCharacter.sim ./test.sh -f general/parser/binary_escapeCharacter.sim
./test.sh -f general/parser/bug.sim ./test.sh -f general/parser/bug.sim
#./test.sh -f general/parser/stream_on_sys.sim
./test.sh -f general/parser/stream.sim
./test.sh -f general/parser/repeatAlter.sim ./test.sh -f general/parser/repeatAlter.sim
#./test.sh -f general/parser/repeatStream.sim
./test.sh -f general/stable/disk.sim ./test.sh -f general/stable/disk.sim
./test.sh -f general/stable/dnode3.sim ./test.sh -f general/stable/dnode3.sim
...@@ -201,7 +199,7 @@ cd ../../../debug; make ...@@ -201,7 +199,7 @@ cd ../../../debug; make
./test.sh -f general/tag/bool.sim ./test.sh -f general/tag/bool.sim
./test.sh -f general/tag/change.sim ./test.sh -f general/tag/change.sim
./test.sh -f general/tag/column.sim ./test.sh -f general/tag/column.sim
#./test.sh -f general/tag/commit.sim ./test.sh -f general/tag/commit.sim
./test.sh -f general/tag/create.sim ./test.sh -f general/tag/create.sim
./test.sh -f general/tag/delete.sim ./test.sh -f general/tag/delete.sim
./test.sh -f general/tag/double.sim ./test.sh -f general/tag/double.sim
...@@ -309,11 +307,16 @@ cd ../../../debug; make ...@@ -309,11 +307,16 @@ cd ../../../debug; make
./test.sh -f unique/vnode/replica3_repeat.sim ./test.sh -f unique/vnode/replica3_repeat.sim
./test.sh -f unique/vnode/replica3_vgroup.sim ./test.sh -f unique/vnode/replica3_vgroup.sim
# stream still has bugs
#./test.sh -f general/parser/stream_on_sys.sim
#./test.sh -f general/parser/stream.sim
#./test.sh -f general/parser/repeatStream.sim
#./test.sh -f general/stream/new_stream.sim
./test.sh -f general/stream/metrics_1.sim ./test.sh -f general/stream/metrics_1.sim
./test.sh -f general/stream/metrics_del.sim ./test.sh -f general/stream/metrics_del.sim
./test.sh -f general/stream/metrics_n.sim ./test.sh -f general/stream/metrics_n.sim
./test.sh -f general/stream/metrics_replica1_vnoden.sim ./test.sh -f general/stream/metrics_replica1_vnoden.sim
#./test.sh -f general/stream/new_stream.sim
./test.sh -f general/stream/restart_stream.sim ./test.sh -f general/stream/restart_stream.sim
./test.sh -f general/stream/stream_1.sim ./test.sh -f general/stream/stream_1.sim
./test.sh -f general/stream/stream_2.sim ./test.sh -f general/stream/stream_2.sim
...@@ -334,6 +337,7 @@ cd ../../../debug; make ...@@ -334,6 +337,7 @@ cd ../../../debug; make
./test.sh -f unique/arbitrator/dn3_mn1_replica_change_dropDnod.sim ./test.sh -f unique/arbitrator/dn3_mn1_replica_change_dropDnod.sim
./test.sh -f unique/arbitrator/dn3_mn1_replica_change.sim ./test.sh -f unique/arbitrator/dn3_mn1_replica_change.sim
./test.sh -f unique/arbitrator/dn3_mn1_stopDnode_timeout.sim ./test.sh -f unique/arbitrator/dn3_mn1_stopDnode_timeout.sim
# lower the priority while file corruption
#./test.sh -f unique/arbitrator/dn3_mn1_vnode_change.sim #./test.sh -f unique/arbitrator/dn3_mn1_vnode_change.sim
#./test.sh -f unique/arbitrator/dn3_mn1_vnode_corruptFile_offline.sim #./test.sh -f unique/arbitrator/dn3_mn1_vnode_corruptFile_offline.sim
#./test.sh -f unique/arbitrator/dn3_mn1_vnode_corruptFile_online.sim #./test.sh -f unique/arbitrator/dn3_mn1_vnode_corruptFile_online.sim
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册