diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 83283cfdad048a932f9df5da0ecf82d4aeeb1280..125dc50d96d66f5d92ab084a3121a73e27a70756 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -45,7 +45,7 @@ extern bool tsPrintAuth; extern int64_t tsTickPerMin[3]; // multi-process -extern bool tsMultiProcess; +extern int32_t tsMultiProcess; extern int32_t tsMnodeShmSize; extern int32_t tsVnodeShmSize; extern int32_t tsQnodeShmSize; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 025ad0ca0d44597a32868bb46fe99b61fbea255f..56dea72e90045c372544995af1ce8e7f1d6e1fa6 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -38,7 +38,7 @@ bool tsEnableSlaveQuery = true; bool tsPrintAuth = false; // multi process -bool tsMultiProcess = false; +int32_t tsMultiProcess = 0; int32_t tsMnodeShmSize = TSDB_MAX_WAL_SIZE * 2 + 128; int32_t tsVnodeShmSize = TSDB_MAX_WAL_SIZE * 10 + 128; int32_t tsQnodeShmSize = TSDB_MAX_WAL_SIZE * 4 + 128; @@ -370,7 +370,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "slaveQuery", tsEnableSlaveQuery, 0) != 0) return -1; if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1; - if (cfgAddBool(pCfg, "multiProcess", tsMultiProcess, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "multiProcess", tsMultiProcess, 0, 2, 0) != 0) return -1; if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1; if (cfgAddInt32(pCfg, "vnodeShmSize", tsVnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1; if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_WAL_SIZE + 128, INT32_MAX, 0) != 0) return -1; @@ -552,7 +552,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsRetrieveBlockingModel = cfgGetItem(pCfg, "retrieveBlockingModel")->bval; tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval; tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval; - tsDeadLockKillQuery = cfgGetItem(pCfg, "deadLockKillQuery")->bval; + tsDeadLockKillQuery = cfgGetItem(pCfg, "deadLockKillQuery")->i32; tsMultiProcess = cfgGetItem(pCfg, "multiProcess")->bval; tsMnodeShmSize = cfgGetItem(pCfg, "mnodeShmSize")->i32; diff --git a/source/dnode/mgmt/node_mgmt/src/dmNodes.c b/source/dnode/mgmt/node_mgmt/src/dmNodes.c index 97a32f87b4b6bb4990d989ce35ffdbc2e41eccfb..e9928bec59e381cf5534f48581ce201d3d744eab 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmNodes.c +++ b/source/dnode/mgmt/node_mgmt/src/dmNodes.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "dmMgmt.h" -static int32_t dmInitParentProc(SMgmtWrapper *pWrapper) { +static int32_t dmCreateShm(SMgmtWrapper *pWrapper) { int32_t shmsize = tsMnodeShmSize; if (pWrapper->ntype == VNODE) { shmsize = tsVnodeShmSize; @@ -38,16 +38,10 @@ static int32_t dmInitParentProc(SMgmtWrapper *pWrapper) { return -1; } dInfo("node:%s, shm:%d is created, size:%d", pWrapper->name, pWrapper->proc.shm.id, shmsize); - - if (dmInitProc(pWrapper) != 0) { - dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr()); - return -1; - } - return 0; } -static int32_t dmNewNodeProc(SMgmtWrapper *pWrapper, EDndNodeType ntype) { +static int32_t dmNewProc(SMgmtWrapper *pWrapper, EDndNodeType ntype) { char tstr[8] = {0}; char *args[6] = {0}; snprintf(tstr, sizeof(tstr), "%d", ntype); @@ -69,21 +63,6 @@ static int32_t dmNewNodeProc(SMgmtWrapper *pWrapper, EDndNodeType ntype) { return 0; } -static int32_t dmRunParentProc(SMgmtWrapper *pWrapper) { - if (pWrapper->pDnode->rtype == NODE_END) { - dInfo("node:%s, should be started manually in child process", pWrapper->name); - } else { - if (dmNewNodeProc(pWrapper, pWrapper->ntype) != 0) { - return -1; - } - } - if (dmRunProc(&pWrapper->proc) != 0) { - dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); - return -1; - } - return 0; -} - int32_t dmOpenNode(SMgmtWrapper *pWrapper) { if (taosMkDir(pWrapper->path) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); @@ -108,34 +87,49 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) { pWrapper->deployed = true; } - if (InChildProc(pWrapper->proc.ptype)) { + if (InParentProc(pWrapper->proc.ptype)) { dDebug("node:%s, start to open", pWrapper->name); - if ((*pWrapper->func.openFp)(&input, &output) != 0) { - dError("node:%s, failed to open since %s", pWrapper->name, terrstr()); + if (dmCreateShm(pWrapper) != 0) { return -1; } - if (dmInitProc(pWrapper) != 0) { + if (dmWriteShmFile(pWrapper->path, pWrapper->name, &pWrapper->proc.shm) != 0) { return -1; } - if (dmRunProc(&pWrapper->proc) != 0) { - return -1; + + if (!OnlyInTestProc(pWrapper->proc.ptype)) { + if (dmInitProc(pWrapper) != 0) { + dError("node:%s, failed to init proc since %s", pWrapper->name, terrstr()); + return -1; + } + if (pWrapper->pDnode->rtype == NODE_END) { + dInfo("node:%s, should be started manually in child process", pWrapper->name); + } else { + if (dmNewProc(pWrapper, pWrapper->ntype) != 0) { + return -1; + } + } + if (dmRunProc(&pWrapper->proc) != 0) { + dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); + return -1; + } } - dDebug("node:%s, has been opened in child process", pWrapper->name); - pWrapper->deployed = true; + dDebug("node:%s, has been opened in parent process", pWrapper->name); } - if (InParentProc(pWrapper->proc.ptype)) { + if (InChildProc(pWrapper->proc.ptype)) { dDebug("node:%s, start to open", pWrapper->name); - if (dmInitParentProc(pWrapper) != 0) { + if ((*pWrapper->func.openFp)(&input, &output) != 0) { + dError("node:%s, failed to open since %s", pWrapper->name, terrstr()); return -1; } - if (dmWriteShmFile(pWrapper->path, pWrapper->name, &pWrapper->proc.shm) != 0) { + if (dmInitProc(pWrapper) != 0) { return -1; } - if (dmRunParentProc(pWrapper) != 0) { + if (dmRunProc(&pWrapper->proc) != 0) { return -1; } - dDebug("node:%s, has been opened in parent process", pWrapper->name); + dDebug("node:%s, has been opened in child process", pWrapper->name); + pWrapper->deployed = true; } if (output.pMgmt != NULL) { @@ -246,7 +240,7 @@ static void dmCloseNodes(SDnode *pDnode) { } static void dmWatchNodes(SDnode *pDnode) { - if (!InParentProc(pDnode->ptype)) return; + if (!OnlyInParentProc(pDnode->ptype)) return; if (pDnode->rtype == NODE_END) return; taosThreadMutexLock(&pDnode->mutex); @@ -255,12 +249,12 @@ static void dmWatchNodes(SDnode *pDnode) { SProc *proc = &pWrapper->proc; if (!pWrapper->required) continue; - if (!InParentProc(proc->ptype)) continue; + if (!OnlyInParentProc(proc->ptype)) continue; if (proc->pid <= 0 || !taosProcExist(proc->pid)) { dWarn("node:%s, process:%d is killed and needs to restart", pWrapper->name, proc->pid); dmCloseProcRpcHandles(&pWrapper->proc); - dmNewNodeProc(pWrapper, ntype); + dmNewProc(pWrapper, ntype); } } taosThreadMutexUnlock(&pDnode->mutex); diff --git a/source/dnode/mgmt/node_mgmt/src/dmProc.c b/source/dnode/mgmt/node_mgmt/src/dmProc.c index 73bab07c5822da2e87aa3ba68bdf4fe357bd678f..187d129fac2c18e550e2bb75bfbdfd63ccf319c4 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmProc.c +++ b/source/dnode/mgmt/node_mgmt/src/dmProc.c @@ -67,7 +67,7 @@ static SProcQueue *dmInitProcQueue(SProc *proc, char *ptr, int32_t size) { return NULL; } - if (InParentProc(proc->ptype) && !InChildProc(proc->ptype)) { + if (InParentProc(proc->ptype)) { if (dmInitProcMutex(queue) != 0) { return NULL; } @@ -185,8 +185,8 @@ static int32_t dmPushToProcQueue(SProc *proc, SProcQueue *queue, const char *pHe taosThreadMutexUnlock(&queue->mutex); tsem_post(&queue->sem); - dTrace("node:%s, push msg:%p %d cont:%p %d handle:%p, ftype:%s pos:%d remain:%d", queue->name, pHead, headLen, pBody, - bodyLen, (void *)handle, dmFuncStr(ftype), pos, queue->items); + dTrace("node:%s, push %s msg:%p %d cont:%p %d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, headLen, + pBody, bodyLen, pos, queue->items); return 0; } @@ -269,13 +269,15 @@ static int32_t dmPopFromProcQueue(SProcQueue *queue, void **ppHead, int16_t *pHe *pBodyLen = rawBodyLen; *pFuncType = (EProcFuncType)ftype; - dTrace("node:%s, pop msg:%p %d body:%p %d, ftype:%s pos:%d remain:%d", queue->name, pHead, rawHeadLen, pBody, - rawBodyLen, dmFuncStr(ftype), pos, queue->items); + dTrace("node:%s, pop %s msg:%p %d body:%p %d, pos:%d remain:%d", queue->name, dmFuncStr(ftype), pHead, rawHeadLen, + pBody, rawBodyLen, pos, queue->items); return 1; } int32_t dmInitProc(struct SMgmtWrapper *pWrapper) { SProc *proc = &pWrapper->proc; + if (proc->name != NULL) return 0; + proc->wrapper = pWrapper; proc->name = pWrapper->name; @@ -319,7 +321,7 @@ static void *dmConsumChildQueue(void *param) { do { numOfMsgs = dmPopFromProcQueue(queue, &pHead, &headLen, &pBody, &bodyLen, &ftype); if (numOfMsgs == 0) { - dDebug("node:%s, get no msg from cueue and exit thread", proc->name); + dDebug("node:%s, get no msg from cqueue and exit thread", proc->name); break; } @@ -330,11 +332,10 @@ static void *dmConsumChildQueue(void *param) { } if (ftype != DND_FUNC_REQ) { - dFatal("node:%s, msg:%p from cqueue, invalid ftype:%d", proc->name, pHead, ftype); + dFatal("node:%s, get msg:%p from cqueue, invalid ftype:%d", proc->name, pHead, ftype); taosFreeQitem(pHead); rpcFreeCont(pBody); } else { - dTrace("node:%s, msg:%p from cueue", proc->name, pHead); pReq = pHead; pReq->rpcMsg.pCont = pBody; code = dmProcessNodeMsg(pWrapper, pReq); @@ -388,22 +389,22 @@ static void *dmConsumParentQueue(void *param) { if (ftype == DND_FUNC_RSP) { pRsp = pHead; pRsp->pCont = pBody; - dTrace("node:%s, rsp msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle); + dTrace("node:%s, get rsp msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle); dmRemoveProcRpcHandle(proc, pRsp->handle); rpcSendResponse(pRsp); } else if (ftype == DND_FUNC_REGIST) { pRsp = pHead; - dTrace("node:%s, regist msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle); + dTrace("node:%s, get regist msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle); rpcRegisterBrokenLinkArg(pRsp); rpcFreeCont(pBody); } else if (ftype == DND_FUNC_RELEASE) { pRsp = pHead; - dTrace("node:%s, release msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle); + dTrace("node:%s, get release msg:%p from pqueue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle); dmRemoveProcRpcHandle(proc, pRsp->handle); rpcReleaseHandle(pRsp->handle, (int8_t)pRsp->code); rpcFreeCont(pBody); } else { - dFatal("node:%s, msg:%p get from pqueue, invalid ftype:%d", proc->name, pHead, ftype); + dFatal("node:%s, get msg:%p from pqueue, invalid ftype:%d", proc->name, pHead, ftype); rpcFreeCont(pBody); } @@ -441,16 +442,17 @@ int32_t dmRunProc(SProc *proc) { } void dmStopProc(SProc *proc) { + proc->stop = true; if (taosCheckPthreadValid(proc->pthread)) { dDebug("node:%s, start to join pthread:%" PRId64, proc->name, proc->pthread); - tsem_post(&proc->cqueue->sem); + tsem_post(&proc->pqueue->sem); taosThreadJoin(proc->pthread, NULL); taosThreadClear(&proc->pthread); } if (taosCheckPthreadValid(proc->cthread)) { dDebug("node:%s, start to join cthread:%" PRId64, proc->name, proc->cthread); - tsem_post(&proc->pqueue->sem); + tsem_post(&proc->cqueue->sem); taosThreadJoin(proc->cthread, NULL); taosThreadClear(&proc->cthread); } @@ -458,6 +460,7 @@ void dmStopProc(SProc *proc) { void dmCleanupProc(struct SMgmtWrapper *pWrapper) { SProc *proc = &pWrapper->proc; + if (proc->name == NULL) return; dDebug("node:%s, start to clean up proc", pWrapper->name); dmStopProc(proc); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 0a2fb71e98db5cd7e27c75c1041c4e564662b639..e242bf2849b93ddab29a77bfe580a99d6e20b87f 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -77,6 +77,9 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)]; SMgmtWrapper *pWrapper = NULL; + dTrace("msg:%s is received, handle:%p app:%p cont:%p len:%d code:0x%04x refId:%" PRId64, TMSG_INFO(msgType), + pRpc->handle, pRpc->ahandle, pRpc->pCont, pRpc->contLen, pRpc->code, pRpc->refId); + if (msgType == TDMT_DND_NET_TEST) { dmProcessNetTestReq(pDnode, pRpc); code = 0; @@ -110,18 +113,24 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { } else { pWrapper = &pDnode->wrappers[pHandle->defaultNtype]; if (pHandle->needCheckVgId) { - SMsgHead *pHead = pRpc->pCont; - int32_t vgId = ntohl(pHead->vgId); - if (vgId == QNODE_HANDLE) { - pWrapper = &pDnode->wrappers[QNODE]; - } else if (vgId == MNODE_HANDLE) { - pWrapper = &pDnode->wrappers[MNODE]; + if (pRpc->contLen > 0) { + SMsgHead *pHead = pRpc->pCont; + int32_t vgId = ntohl(pHead->vgId); + if (vgId == QNODE_HANDLE) { + pWrapper = &pDnode->wrappers[QNODE]; + } else if (vgId == MNODE_HANDLE) { + pWrapper = &pDnode->wrappers[MNODE]; + } else { + terrno = TSDB_CODE_INVALID_MSG; + goto _OVER; + } } else { + terrno = TSDB_CODE_INVALID_MSG_LEN; + goto _OVER; } } } - dTrace("msg:%s is received, handle:%p app:%p", TMSG_INFO(msgType), pRpc->handle, pRpc->ahandle); if (dmMarkWrapper(pWrapper) != 0) { goto _OVER; } else { @@ -138,7 +147,6 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { } if (InParentProc(pWrapper->proc.ptype)) { - dTrace("msg:%p, put into cqueue, handle:%p refId:%" PRId64, pMsg, pRpc->handle, pRpc->refId); code = dmPutToProcCQueue(&pWrapper->proc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, (isReq && (pRpc->code == 0)) ? pRpc->handle : NULL, pRpc->refId, DND_FUNC_REQ); } else {