diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index cb59599d9adbd16252c8fc87f065dcaa9abfd662..6c3671a8d6068376b9bb63f3bd14fd56919183d9 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -50,7 +50,6 @@ typedef struct { PutToQueueFp queueFps[QUEUE_MAX]; GetQueueSizeFp qsizeFp; SendReqFp sendReqFp; - SendMnodeReqFp sendMnodeReqFp; SendRspFp sendRspFp; RegisterBrokenLinkArgFp registerBrokenLinkArgFp; ReleaseHandleFp releaseHandleFp; @@ -60,7 +59,6 @@ void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb); int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq); int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype); int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq); -int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq); void tmsgSendRsp(const SRpcMsg* pRsp); void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg); void tmsgReleaseHandle(void* handle, int8_t type); diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 5b88a9d6aff34bf3a151650f42c48e4ef325e81d..08ab63e55abb4498e058c8e014387c1046a76621 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -29,8 +29,7 @@ extern "C" { typedef struct SMnode SMnode; typedef struct { - int32_t dnodeId; - int64_t clusterId; + bool deploy; int8_t replica; int8_t selfIndex; SReplica replicas[TSDB_MAX_REPLICA]; diff --git a/include/os/osShm.h b/include/os/osShm.h index d26a99e2778e56ccc1d58b185cfa0ee7b19ebd89..61ffc0f6cc983229e5d35385909905b7af036731 100644 --- a/include/os/osShm.h +++ b/include/os/osShm.h @@ -26,7 +26,7 @@ typedef struct { void* ptr; } SShm; -int32_t taosCreateShm(SShm *pShm, int32_t shmsize) ; +int32_t taosCreateShm(SShm *pShm, int32_t key, int32_t shmsize) ; void taosDropShm(SShm *pShm); int32_t taosAttachShm(SShm *pShm); diff --git a/include/util/tprocess.h b/include/util/tprocess.h index 3a47450eecc5d56e346bb0664fc8dc9748118e68..bc0c8fe5a43f269c65aa75a4828bc678ef0d26fa 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -22,7 +22,7 @@ extern "C" { #endif -typedef enum { PROC_REQ, PROC_RSP, PROC_REG, PROC_RELEASE } ProcFuncType; +typedef enum { PROC_QUEUE, PROC_REQ, PROC_RSP, PROC_REGIST, PROC_RELEASE } ProcFuncType; typedef struct SProcQueue SProcQueue; typedef struct SProcObj SProcObj; @@ -53,7 +53,7 @@ void taosProcCleanup(SProcObj *pProc); int32_t taosProcRun(SProcObj *pProc); int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, ProcFuncType ftype); -int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, +void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, ProcFuncType ftype); #ifdef __cplusplus diff --git a/source/common/src/tmsgcb.c b/source/common/src/tmsgcb.c index e90634a604a04ff5e899b793bb1ec65318470002..cb5e2b07c1b7dd8864bd505bb35527868254a6f3 100644 --- a/source/common/src/tmsgcb.c +++ b/source/common/src/tmsgcb.c @@ -32,10 +32,6 @@ int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq) { return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq); } -int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, SRpcMsg* pReq) { - return (*pMsgCb->sendMnodeReqFp)(pMsgCb->pWrapper, pReq); -} - void tmsgSendRsp(const SRpcMsg* pRsp) { return (*tsDefaultMsgCb.sendRspFp)(tsDefaultMsgCb.pWrapper, pRsp); } void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) { diff --git a/source/dnode/mgmt/bm/src/bmInt.c b/source/dnode/mgmt/bm/src/bmInt.c index 4b87f4463c978072d372f6b44419d1f66d5317a3..4ca7afbb6d742b25fbe5256d72c6eb94cc23cbd0 100644 --- a/source/dnode/mgmt/bm/src/bmInt.c +++ b/source/dnode/mgmt/bm/src/bmInt.c @@ -19,12 +19,7 @@ static int32_t bmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); } static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) { - SMsgCb msgCb = {0}; - msgCb.pWrapper = pMgmt->pWrapper; - msgCb.sendReqFp = dndSendReqToDnode; - msgCb.sendMnodeReqFp = dndSendReqToMnode; - msgCb.sendRspFp = dndSendRsp; - msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; + SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper); pOption->msgCb = msgCb; } diff --git a/source/dnode/mgmt/main/exe/dndMain.c b/source/dnode/mgmt/main/exe/dndMain.c index 0d2ddbfbbc3f55268e4e3fa1d9263ec70ae3f03f..c6a109d62a547ccd6e17aee264ccd4eaf01ac6fd 100644 --- a/source/dnode/mgmt/main/exe/dndMain.c +++ b/source/dnode/mgmt/main/exe/dndMain.c @@ -30,7 +30,6 @@ static struct { } global = {0}; static void dndStopDnode(int signum, void *info, void *ctx) { - dInfo("system signal:%d received", signum); SDnode *pDnode = atomic_val_compare_exchange_ptr(&global.pDnode, 0, global.pDnode); if (pDnode != NULL) { dndHandleEvent(pDnode, DND_EVENT_STOP); @@ -41,8 +40,10 @@ static void dndSetSignalHandle() { taosSetSignal(SIGTERM, dndStopDnode); taosSetSignal(SIGHUP, dndStopDnode); taosSetSignal(SIGINT, dndStopDnode); + taosSetSignal(SIGTSTP, dndStopDnode); taosSetSignal(SIGABRT, dndStopDnode); taosSetSignal(SIGBREAK, dndStopDnode); + taosSetSignal(SIGQUIT, dndStopDnode); if (!tsMultiProcess) { } else if (global.ntype == DNODE || global.ntype == NODE_MAX) { @@ -72,7 +73,7 @@ static int32_t dndParseArgs(int32_t argc, char const *argv[]) { } else if (strcmp(argv[i], "-n") == 0) { global.ntype = atoi(argv[++i]); if (global.ntype <= DNODE || global.ntype > NODE_MAX) { - printf("'-n' range is [1-5], default is 0\n"); + printf("'-n' range is [1 - %d], default is 0\n", NODE_MAX - 1); return -1; } } else if (strcmp(argv[i], "-k") == 0) { diff --git a/source/dnode/mgmt/main/inc/dnd.h b/source/dnode/mgmt/main/inc/dnd.h index b9c760c980f214a82bf557bcd3a7196b2b37a9c6..255a739addd71aeb8db9731d985437e144772829 100644 --- a/source/dnode/mgmt/main/inc/dnd.h +++ b/source/dnode/mgmt/main/inc/dnd.h @@ -149,7 +149,7 @@ int32_t dndInitClient(SDnode *pDnode); void dndCleanupClient(SDnode *pDnode); int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg); int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); -int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg); +int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg); void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper); diff --git a/source/dnode/mgmt/main/src/dndExec.c b/source/dnode/mgmt/main/src/dndExec.c index b37893aa6f3dee977c9219b198a7c2d34b6174fa..8837da6d48fc2d995f8045dd070311fc34531e0f 100644 --- a/source/dnode/mgmt/main/src/dndExec.c +++ b/source/dnode/mgmt/main/src/dndExec.c @@ -45,7 +45,7 @@ int32_t dndOpenNode(SMgmtWrapper *pWrapper) { } void dndCloseNode(SMgmtWrapper *pWrapper) { - dDebug("node:%s, start to close", pWrapper->name); + dDebug("node:%s, mgmt start to close", pWrapper->name); pWrapper->required = false; taosWLockLatch(&pWrapper->latch); if (pWrapper->deployed) { @@ -62,7 +62,7 @@ void dndCloseNode(SMgmtWrapper *pWrapper) { taosProcCleanup(pWrapper->pProc); pWrapper->pProc = NULL; } - dDebug("node:%s, has been closed", pWrapper->name); + dDebug("node:%s, mgmt has been closed", pWrapper->name); } static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, @@ -90,10 +90,10 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, ProcFuncType ftype) { pMsg->pCont = pCont; - dTrace("msg:%p, get from parent queue, handle:%p app:%p", pMsg, pMsg->handle, pMsg->ahandle); + dTrace("msg:%p, get from parent queue, ftype:%d handle:%p, app:%p", pMsg, ftype, pMsg->handle, pMsg->ahandle); switch (ftype) { - case PROC_REG: + case PROC_REGIST: rpcRegisterBrokenLinkArg(pMsg); break; case PROC_RELEASE: @@ -101,11 +101,14 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t rpcFreeCont(pCont); break; case PROC_REQ: - // todo send to dnode dndSendReqToMnode(pWrapper, pMsg); - default: + // dndSendReq(pWrapper, (const SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg); + break; + case PROC_RSP: dndSendRpcRsp(pWrapper, pMsg); break; + default: + break; } taosMemoryFree(pMsg); } @@ -180,6 +183,7 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) { while (1) { if (pDnode->event == DND_EVENT_STOP) { dInfo("dnode is about to stop"); + dndSetStatus(pDnode, DND_STAT_STOPPED); break; } taosMsleep(100); @@ -202,7 +206,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { if (!pWrapper->required) continue; int32_t shmsize = 1024 * 1024 * 2; // size will be a configuration item - if (taosCreateShm(&pWrapper->shm, shmsize) != 0) { + if (taosCreateShm(&pWrapper->shm, n, shmsize) != 0) { terrno = TAOS_SYSTEM_ERROR(terrno); dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr()); return -1; @@ -255,6 +259,8 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { while (1) { if (pDnode->event == DND_EVENT_STOP) { dInfo("dnode is about to stop"); + dndSetStatus(pDnode, DND_STAT_STOPPED); + for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; if (!pWrapper->required) continue; @@ -263,15 +269,6 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) { dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pWrapper->procId); taosKillProc(pWrapper->procId); - } - } - - for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - if (!pWrapper->required) continue; - if (pDnode->ntype == NODE_MAX) continue; - - if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) { dInfo("node:%s, wait for child process:%d to stop", pWrapper->name, pWrapper->procId); taosWaitProc(pWrapper->procId); dInfo("node:%s, child process:%d is stopped", pWrapper->name, pWrapper->procId); @@ -331,6 +328,8 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) { } } + dndSetStatus(pDnode, DND_STAT_RUNNING); + if (taosProcRun(pWrapper->pProc) != 0) { dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); return -1; @@ -340,11 +339,14 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) { dndReportStartup(pDnode, "TDengine", "initialized successfully"); while (1) { if (pDnode->event == DND_EVENT_STOP) { - dInfo("dnode is about to stop"); + dInfo("%s is about to stop", pWrapper->name); + dndSetStatus(pDnode, DND_STAT_STOPPED); break; } taosMsleep(100); } + + return 0; } int32_t dndRun(SDnode *pDnode) { diff --git a/source/dnode/mgmt/main/src/dndInt.c b/source/dnode/mgmt/main/src/dndInt.c index 602ebc6b3cfcfa2177b511bd48b49ba68272ceca..6f6e21b983d0b41f4e8765b996a6ffd4dcf9f5bf 100644 --- a/source/dnode/mgmt/main/src/dndInt.c +++ b/source/dnode/mgmt/main/src/dndInt.c @@ -133,14 +133,6 @@ _OVER: void dndClose(SDnode *pDnode) { if (pDnode == NULL) return; - if (dndGetStatus(pDnode) == DND_STAT_STOPPED) { - dError("dnode is shutting down, data:%p", pDnode); - return; - } - - dInfo("start to close dnode, data:%p", pDnode); - dndSetStatus(pDnode, DND_STAT_STOPPED); - for (ENodeType n = 0; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; dndCloseNode(pWrapper); @@ -151,7 +143,6 @@ void dndClose(SDnode *pDnode) { } void dndHandleEvent(SDnode *pDnode, EDndEvent event) { - dInfo("dnode receive %s event, data:%p", dndEventStr(event), pDnode); if (event == DND_EVENT_STOP) { pDnode->event = event; } diff --git a/source/dnode/mgmt/main/src/dndTransport.c b/source/dnode/mgmt/main/src/dndTransport.c index 07ea0309a86f0dc58062b4464907de61017e00c7..d463f2ba7a137252c6c93529bacb4feca0b2c920 100644 --- a/source/dnode/mgmt/main/src/dndTransport.c +++ b/source/dnode/mgmt/main/src/dndTransport.c @@ -319,22 +319,6 @@ static int32_t dndSendRpcReq(STransMgmt *pMgmt, const SEpSet *pEpSet, SRpcMsg *p return 0; } -int32_t dndSendReqToDnode(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) { - if (pWrapper->procType != PROC_CHILD) { - SDnode *pDnode = pWrapper->pDnode; - if (dndGetStatus(pDnode) != DND_STAT_RUNNING) { - terrno = TSDB_CODE_DND_OFFLINE; - dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle); - return -1; - } - return dndSendRpcReq(&pDnode->trans, pEpSet, pReq); - } else { - while (taosProcPutToParentQ(pWrapper->pProc, pReq, sizeof(SRpcMsg), pReq->pCont, pReq->contLen, PROC_REQ) != 0) { - taosMsleep(1); - } - } -} - int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) { SDnode *pDnode = pWrapper->pDnode; STransMgmt *pTrans = &pDnode->trans; @@ -362,13 +346,37 @@ void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { } } +int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) { + SDnode *pDnode = pWrapper->pDnode; + if (dndGetStatus(pDnode) != DND_STAT_RUNNING) { + terrno = TSDB_CODE_DND_OFFLINE; + dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle); + return -1; + } + + if (pWrapper->procType != PROC_CHILD) { + return dndSendRpcReq(&pDnode->trans, pEpSet, pReq); + } else { + int32_t headLen = sizeof(SRpcMsg) + sizeof(SEpSet); + char *pHead = taosMemoryMalloc(headLen); + if (pHead == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + memcpy(pHead, pReq, sizeof(SRpcMsg)); + memcpy(pHead + sizeof(SRpcMsg), pEpSet, sizeof(SEpSet)); + + taosProcPutToParentQ(pWrapper->pProc, pReq, headLen, pReq->pCont, pReq->contLen, PROC_REQ); + taosMemoryFree(pHead); + return 0; + } +} + void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { if (pWrapper->procType != PROC_CHILD) { dndSendRpcRsp(pWrapper, pRsp); } else { - while (taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP) != 0) { - taosMsleep(1); - } + taosProcPutToParentQ(pWrapper->pProc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP); } } @@ -376,9 +384,7 @@ void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { if (pWrapper->procType != PROC_CHILD) { rpcRegisterBrokenLinkArg(pMsg); } else { - while (taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REG) != 0) { - taosMsleep(1); - } + taosProcPutToParentQ(pWrapper->pProc, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REGIST); } } @@ -387,20 +393,17 @@ static void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) rpcReleaseHandle(handle, type); } else { SRpcMsg msg = {.handle = handle, .code = type}; - while (taosProcPutToParentQ(pWrapper->pProc, &msg, sizeof(SRpcMsg), NULL, 0, PROC_RELEASE) != 0) { - taosMsleep(1); - } + taosProcPutToParentQ(pWrapper->pProc, &msg, sizeof(SRpcMsg), NULL, 0, PROC_RELEASE); } } SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper) { SMsgCb msgCb = { .pWrapper = pWrapper, + .sendReqFp = dndSendReq, + .sendRspFp = dndSendRsp, .registerBrokenLinkArgFp = dndRegisterBrokenLinkArg, .releaseHandleFp = dndReleaseHandle, - .sendMnodeReqFp = dndSendReqToMnode, - .sendReqFp = dndSendReqToDnode, - .sendRspFp = dndSendRsp, }; return msgCb; } \ No newline at end of file diff --git a/source/dnode/mgmt/mm/src/mmFile.c b/source/dnode/mgmt/mm/src/mmFile.c index e5cc0ce0871bfa398bd7c46f4c9260bb16e8f0de..76aba771cbd08271a8995f6d4a79ebb7b4ba235d 100644 --- a/source/dnode/mgmt/mm/src/mmFile.c +++ b/source/dnode/mgmt/mm/src/mmFile.c @@ -111,7 +111,7 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, bool deployed) { TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno);; + terrno = TAOS_SYSTEM_ERROR(errno); dError("failed to write %s since %s", file, terrstr()); return -1; } @@ -145,7 +145,7 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, bool deployed) { snprintf(realfile, sizeof(realfile), "%s%smnode.json", pMgmt->path, TD_DIRSEP); if (taosRenameFile(file, realfile) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno);; + terrno = TAOS_SYSTEM_ERROR(errno); dError("failed to rename %s since %s", file, terrstr()); return -1; } diff --git a/source/dnode/mgmt/mm/src/mmInt.c b/source/dnode/mgmt/mm/src/mmInt.c index 3d15c9b9aeeb7768e20139463498885f0372ad3a..301ca598e6db31a4f771807d0b8c79f9738a79e7 100644 --- a/source/dnode/mgmt/mm/src/mmInt.c +++ b/source/dnode/mgmt/mm/src/mmInt.c @@ -39,20 +39,11 @@ static int32_t mmRequire(SMgmtWrapper *pWrapper, bool *required) { } static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { - SDnode *pDnode = pMgmt->pDnode; - pOption->dnodeId = pDnode->dnodeId; - pOption->clusterId = pDnode->clusterId; - - SMsgCb msgCb = {0}; - msgCb.pWrapper = pMgmt->pWrapper; + SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper); msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToQueryQueue; msgCb.queueFps[READ_QUEUE] = mmPutMsgToReadQueue; msgCb.queueFps[WRITE_QUEUE] = mmPutMsgToWriteQueue; msgCb.queueFps[SYNC_QUEUE] = mmPutMsgToWriteQueue; - msgCb.sendReqFp = dndSendReqToDnode; - msgCb.sendMnodeReqFp = dndSendReqToMnode; - msgCb.sendRspFp = dndSendRsp; - msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; pOption->msgCb = msgCb; } @@ -66,6 +57,7 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { pReplica->id = 1; pReplica->port = pDnode->serverPort; tstrncpy(pReplica->fqdn, pDnode->localFqdn, TSDB_FQDN_LEN); + pOption->deploy = true; pMgmt->selfIndex = pOption->selfIndex; pMgmt->replica = pOption->replica; @@ -77,6 +69,7 @@ static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { pOption->selfIndex = pMgmt->selfIndex; pOption->replica = pMgmt->replica; memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); + pOption->deploy = false; } static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) { @@ -89,7 +82,7 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre pReplica->id = pCreate->replicas[i].id; pReplica->port = pCreate->replicas[i].port; memcpy(pReplica->fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN); - if (pReplica->id == pOption->dnodeId) { + if (pReplica->id == pMgmt->pDnode->dnodeId) { pOption->selfIndex = i; } } @@ -98,6 +91,7 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre dError("failed to build mnode options since %s", terrstr()); return -1; } + pOption->deploy = true; pMgmt->selfIndex = pOption->selfIndex; pMgmt->replica = pOption->replica; @@ -225,9 +219,7 @@ int32_t mmOpenFromMsg(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq) { return code; } -static int32_t mmOpen(SMgmtWrapper *pWrapper) { - return mmOpenFromMsg(pWrapper, NULL); -} +static int32_t mmOpen(SMgmtWrapper *pWrapper) { return mmOpenFromMsg(pWrapper, NULL); } static int32_t mmStart(SMgmtWrapper *pWrapper) { dDebug("mnode-mgmt start to run"); @@ -258,7 +250,7 @@ int32_t mmGetUserAuth(SMgmtWrapper *pWrapper, char *user, char *spi, char *encry } int32_t mmMonitorMnodeInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo, - SMonGrantInfo *pGrantInfo) { + SMonGrantInfo *pGrantInfo) { SMnodeMgmt *pMgmt = pWrapper->pMgmt; return mndGetMonitorInfo(pMgmt->pMnode, pClusterInfo, pVgroupInfo, pGrantInfo); } diff --git a/source/dnode/mgmt/qm/src/qmInt.c b/source/dnode/mgmt/qm/src/qmInt.c index 11c80a29044391f63417d848d1f32e325671151d..8079859b96cb07ae8372db25b8c89b94a7a30a9b 100644 --- a/source/dnode/mgmt/qm/src/qmInt.c +++ b/source/dnode/mgmt/qm/src/qmInt.c @@ -19,15 +19,10 @@ static int32_t qmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); } static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) { - SMsgCb msgCb = {0}; - msgCb.pWrapper = pMgmt->pWrapper; + SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper); msgCb.queueFps[QUERY_QUEUE] = qmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = qmPutMsgToFetchQueue; msgCb.qsizeFp = qmGetQueueSize; - msgCb.sendReqFp = dndSendReqToDnode; - msgCb.sendMnodeReqFp = dndSendReqToMnode; - msgCb.sendRspFp = dndSendRsp; - msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; pOption->msgCb = msgCb; } diff --git a/source/dnode/mgmt/sm/src/smInt.c b/source/dnode/mgmt/sm/src/smInt.c index a639fc76bb98514f4f18bfab2d8fc9a8bae7b679..6d2e2aaefd372d836e5ed8a171db3e28e27c9fe5 100644 --- a/source/dnode/mgmt/sm/src/smInt.c +++ b/source/dnode/mgmt/sm/src/smInt.c @@ -19,12 +19,7 @@ static int32_t smRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); } static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) { - SMsgCb msgCb = {0}; - msgCb.pWrapper = pMgmt->pWrapper; - msgCb.sendReqFp = dndSendReqToDnode; - msgCb.sendMnodeReqFp = dndSendReqToMnode; - msgCb.sendRspFp = dndSendRsp; - msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; + SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper); pOption->msgCb = msgCb; } diff --git a/source/dnode/mgmt/test/vnode/vnode.cpp b/source/dnode/mgmt/test/vnode/vnode.cpp index 39de533b733739ca67b0c2e63adb0167467e2f5f..40e7472d428d1ca8c028d4d500be52d0e41f0664 100644 --- a/source/dnode/mgmt/test/vnode/vnode.cpp +++ b/source/dnode/mgmt/test/vnode/vnode.cpp @@ -70,46 +70,6 @@ TEST_F(DndTestVnode, 01_Create_Vnode) { ASSERT_EQ(pRsp->code, TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED); } } - - { - SCreateVnodeReq createReq = {0}; - createReq.vgId = 2; - createReq.dnodeId = 3; - strcpy(createReq.db, "1.d1"); - createReq.dbUid = 9527; - createReq.vgVersion = 1; - createReq.cacheBlockSize = 16; - createReq.totalBlocks = 10; - createReq.daysPerFile = 10; - createReq.daysToKeep0 = 3650; - createReq.daysToKeep1 = 3650; - createReq.daysToKeep2 = 3650; - createReq.minRows = 100; - createReq.minRows = 4096; - createReq.commitTime = 3600; - createReq.fsyncPeriod = 3000; - createReq.walLevel = 1; - createReq.precision = 0; - createReq.compression = 2; - createReq.replica = 1; - createReq.quorum = 1; - createReq.update = 0; - createReq.cacheLastRow = 0; - createReq.selfIndex = 0; - for (int r = 0; r < createReq.replica; ++r) { - SReplica* pReplica = &createReq.replicas[r]; - pReplica->id = 1; - pReplica->port = 9527; - } - - int32_t contLen = tSerializeSCreateVnodeReq(NULL, 0, &createReq); - void* pReq = rpcMallocCont(contLen); - tSerializeSCreateVnodeReq(pReq, contLen, &createReq); - - SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_VNODE, pReq, contLen); - ASSERT_NE(pRsp, nullptr); - ASSERT_EQ(pRsp->code, TSDB_CODE_DND_VNODE_INVALID_OPTION); - } } TEST_F(DndTestVnode, 02_Alter_Vnode) { diff --git a/source/dnode/mgmt/vm/src/vmInt.c b/source/dnode/mgmt/vm/src/vmInt.c index b52c6253dc9e06026acb7a3656c691a46d1a89a3..d9ef7b5ae6dfdb0ab200845bbef75ac237be3004 100644 --- a/source/dnode/mgmt/vm/src/vmInt.c +++ b/source/dnode/mgmt/vm/src/vmInt.c @@ -128,16 +128,12 @@ static void *vmOpenVnodeFunc(void *param) { pMgmt->state.openVnodes, pMgmt->state.totalVnodes); dndReportStartup(pDnode, "open-vnodes", stepDesc); - SMsgCb msgCb = {0}; + SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper); msgCb.pWrapper = pMgmt->pWrapper; msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; msgCb.qsizeFp = vmGetQueueSize; - msgCb.sendReqFp = dndSendReqToDnode; - msgCb.sendMnodeReqFp = dndSendReqToMnode; - msgCb.sendRspFp = dndSendRsp; - msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; SVnodeCfg cfg = {.msgCb = msgCb, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid}; SVnode *pImpl = vnodeOpen(pCfg->path, &cfg); if (pImpl == NULL) { diff --git a/source/dnode/mgmt/vm/src/vmMsg.c b/source/dnode/mgmt/vm/src/vmMsg.c index f00bb89354b5b9af161888fa9fcae95600cf5d07..c3ad74d246f4079e1d0ef4ed4e9156de4b36d347 100644 --- a/source/dnode/mgmt/vm/src/vmMsg.c +++ b/source/dnode/mgmt/vm/src/vmMsg.c @@ -68,12 +68,6 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { SWrapperCfg wrapperCfg = {0}; vmGenerateWrapperCfg(pMgmt, &createReq, &wrapperCfg); - if (createReq.dnodeId != pMgmt->pDnode->dnodeId) { - terrno = TSDB_CODE_DND_VNODE_INVALID_OPTION; - dDebug("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr()); - return -1; - } - SVnodeObj *pVnode = vmAcquireVnode(pMgmt, createReq.vgId); if (pVnode != NULL) { dDebug("vgId:%d, already exist", createReq.vgId); @@ -82,16 +76,12 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return -1; } - SMsgCb msgCb = {0}; + SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper); msgCb.pWrapper = pMgmt->pWrapper; msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; msgCb.queueFps[APPLY_QUEUE] = vmPutMsgToApplyQueue; msgCb.qsizeFp = vmGetQueueSize; - msgCb.sendReqFp = dndSendReqToDnode; - msgCb.sendMnodeReqFp = dndSendReqToMnode; - msgCb.sendRspFp = dndSendRsp; - msgCb.registerBrokenLinkArgFp = dndRegisterBrokenLinkArg; vnodeCfg.msgCb = msgCb; vnodeCfg.pTfs = pMgmt->pTfs; diff --git a/source/dnode/mnode/impl/inc/mndCluster.h b/source/dnode/mnode/impl/inc/mndCluster.h index 0206695b8814ee83832aeddeed86b4738649b3b7..5b7bac4486492d4c852a49c4c706526092ff83ec 100644 --- a/source/dnode/mnode/impl/inc/mndCluster.h +++ b/source/dnode/mnode/impl/inc/mndCluster.h @@ -25,6 +25,7 @@ extern "C" { int32_t mndInitCluster(SMnode *pMnode); void mndCleanupCluster(SMnode *pMnode); int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len); +int64_t mndGetClusterId(SMnode *pMnode); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index f89e9d8fe0536f4848bd5798ce3b4c4fe806e1f6..1cb0c78aa536a9982940e4d5424a1dc9bc86072b 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -100,7 +100,6 @@ typedef struct { } SGrantInfo; typedef struct SMnode { - int32_t dnodeId; int64_t clusterId; int8_t replica; int8_t selfIndex; diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index dde7e1fe8fd946c0f9632dcb7273b89cbe246757..94e1efde61b7f9d5f2dab3969b8a87fb12d27a3d 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -17,7 +17,7 @@ #include "mndCluster.h" #include "mndShow.h" -#define TSDB_CLUSTER_VER_NUMBE 1 +#define TSDB_CLUSTER_VER_NUMBE 1 #define TSDB_CLUSTER_RESERVE_SIZE 64 static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster); @@ -61,6 +61,23 @@ int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len) { return 0; } +int64_t mndGetClusterId(SMnode *pMnode) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + int64_t clusterId = -1; + + while (1) { + SClusterObj *pCluster = NULL; + pIter = sdbFetch(pSdb, SDB_CLUSTER, pIter, (void **)&pCluster); + if (pIter == NULL) break; + + clusterId = pCluster->id; + sdbRelease(pSdb, pCluster); + } + + return clusterId; +} + static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 6c38d8626cdbb94cad232f695eb07d503eef98a2..cad89399a3a8b3326325889b53b84705fc628f7c 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -433,12 +433,6 @@ static int32_t mndProcessHeartBeatReq(SNodeMsg *pReq) { pHeartbeat->connId = htonl(pHeartbeat->connId); pHeartbeat->pid = htonl(pHeartbeat->pid); - SRpcConnInfo info = {0}; - if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) { - mError("user:%s, connId:%d failed to process hb since %s", pReq->user, pHeartbeat->connId, terrstr()); - return -1; - } - SConnObj *pConn = mndAcquireConn(pMnode, pHeartbeat->connId); if (pConn == NULL) { pConn = mndCreateConn(pMnode, &info, pHeartbeat->pid, pHeartbeat->app, 0); diff --git a/source/dnode/mnode/impl/src/mndQnode.c b/source/dnode/mnode/impl/src/mndQnode.c index 4b19a26bc4f2dc7a6c039b551b17835d347951f0..85718e203717a684117bb2a21645652a785a9a3a 100644 --- a/source/dnode/mnode/impl/src/mndQnode.c +++ b/source/dnode/mnode/impl/src/mndQnode.c @@ -423,7 +423,7 @@ static int32_t mndProcessDropQnodeReq(SNodeMsg *pReq) { DROP_QNODE_OVER: if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { - mError("qnode:%d, failed to drop since %s", pMnode->dnodeId, terrstr()); + mError("qnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr()); } mndReleaseQnode(pMnode, pObj); diff --git a/source/dnode/mnode/impl/src/mndSnode.c b/source/dnode/mnode/impl/src/mndSnode.c index 5e0d9fae9a4bccbd382355ad2897b99c13ea8929..4f24c6f7eb957992899f848143f87dbdd827e8ef 100644 --- a/source/dnode/mnode/impl/src/mndSnode.c +++ b/source/dnode/mnode/impl/src/mndSnode.c @@ -433,7 +433,7 @@ static int32_t mndProcessDropSnodeReq(SNodeMsg *pReq) { DROP_SNODE_OVER: if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { - mError("snode:%d, failed to drop since %s", pMnode->dnodeId, terrstr()); + mError("snode:%d, failed to drop since %s", dropReq.dnodeId, terrstr()); } mndReleaseSnode(pMnode, pObj); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 67b7d6dd45f754a6378c3b25941b5118ce62f61c..5c3dd778e15cef24ebf734d883f5e6def9d805c5 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -187,7 +187,7 @@ static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCle return 0; } -static int32_t mndInitSteps(SMnode *pMnode) { +static int32_t mndInitSteps(SMnode *pMnode, bool deploy) { if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1; if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1; @@ -210,7 +210,7 @@ static int32_t mndInitSteps(SMnode *pMnode) { if (mndAllocStep(pMnode, "mnode-infos", mndInitInfos, mndCleanupInfos) != 0) return -1; if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1; - if (pMnode->clusterId <= 0) { + if (deploy) { if (mndAllocStep(pMnode, "mnode-sdb-deploy", mndDeploySdb, NULL) != 0) return -1; } else { if (mndAllocStep(pMnode, "mnode-sdb-read", mndReadSdb, NULL) != 0) return -1; @@ -263,23 +263,15 @@ static int32_t mndExecSteps(SMnode *pMnode) { } } + pMnode->clusterId = mndGetClusterId(pMnode); return 0; } -static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { - pMnode->dnodeId = pOption->dnodeId; - pMnode->clusterId = pOption->clusterId; +static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { pMnode->replica = pOption->replica; pMnode->selfIndex = pOption->selfIndex; memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); pMnode->msgCb = pOption->msgCb; - - if (pMnode->dnodeId < 0 || pMnode->clusterId < 0) { - terrno = TSDB_CODE_MND_INVALID_OPTIONS; - return -1; - } - - return 0; } SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { @@ -294,6 +286,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { char timestr[24] = "1970-01-01 00:00:00.00"; (void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); + mndSetOptions(pMnode, pOption); pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep)); if (pMnode->pSteps == NULL) { @@ -312,16 +305,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { return NULL; } - code = mndSetOptions(pMnode, pOption); - if (code != 0) { - code = terrno; - mError("failed to open mnode since %s", terrstr()); - mndClose(pMnode); - terrno = code; - return NULL; - } - - code = mndInitSteps(pMnode); + code = mndInitSteps(pMnode, pOption->deploy); if (code != 0) { code = terrno; mError("failed to open mnode since %s", terrstr()); diff --git a/source/os/src/osProc.c b/source/os/src/osProc.c index 6b52fa30be2b7d1a5318388ce036d09b7b690f21..1cdd41ad78e448caed56a412b340399f624ba4a5 100644 --- a/source/os/src/osProc.c +++ b/source/os/src/osProc.c @@ -23,9 +23,9 @@ int32_t taosNewProc(char **args) { int32_t pid = fork(); if (pid == 0) { args[0] = tsProcPath; - close(STDIN_FILENO); + // close(STDIN_FILENO); close(STDOUT_FILENO); - close(STDERR_FILENO); + // close(STDERR_FILENO); return execvp(tsProcPath, args); } else { return pid; @@ -33,7 +33,7 @@ int32_t taosNewProc(char **args) { } void taosWaitProc(int32_t pid) { - int32_t status = 0; + int32_t status = -1; waitpid(pid, &status, 0); } diff --git a/source/os/src/osShm.c b/source/os/src/osShm.c index ba184c1f5d4ec79472919098aab8facc66ba07f8..bf784f14accbacbd7a4c6e6c1dcbd085239b3f09 100644 --- a/source/os/src/osShm.c +++ b/source/os/src/osShm.c @@ -17,10 +17,10 @@ #define _DEFAULT_SOURCE #include "os.h" -int32_t taosCreateShm(SShm* pShm, int32_t shmsize) { +int32_t taosCreateShm(SShm* pShm, int32_t key, int32_t shmsize) { pShm->id = -1; - int32_t shmid = shmget(0X95279527, shmsize, IPC_CREAT | 0600); + int32_t shmid = shmget(0X95270000 + key, shmsize, IPC_CREAT | 0600); if (shmid < 0) { return -1; } diff --git a/source/os/src/osSignal.c b/source/os/src/osSignal.c index d4e6cb3318f788e1173c1f25192c59d731e76f37..1d7fa517e58c873a4fbfb1d66792a971945370a6 100644 --- a/source/os/src/osSignal.c +++ b/source/os/src/osSignal.c @@ -59,7 +59,7 @@ void taosSetSignal(int32_t signum, FSignalHandler sigfp) { struct sigaction act; memset(&act, 0, sizeof(act)); #if 1 - act.sa_flags = SA_SIGINFO; + act.sa_flags = SA_SIGINFO | SA_RESTART; act.sa_sigaction = (FLinuxSignalHandler)sigfp; #else act.sa_handler = sigfp; diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 1d41bd4a48e616cf16930ffb770b59abdec2785b..600413068c5560642f4689be671e35bd949c089c 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -434,7 +434,9 @@ int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, ftype); } -int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, - ProcFuncType ftype) { - return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, ftype); +void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, + ProcFuncType ftype) { + while (taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, ftype) != 0) { + taosMsleep(1); + } } diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 5c503c39bb82cafb07b955f9574aae4f3a46ad48..d8fbfce55f699a7e834e982d47f1e7ed84271f04 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -52,4 +52,10 @@ ./test.sh -f tsim/stable/values.sim ./test.sh -f tsim/stable/vnode3.sim + +# --- for multi process mode +./test.sh -f tsim/user/basic1.sim -m +./test.sh -f tsim/stable/vnode3.sim -m +./test.sh -f tsim/tmq/basic.sim -m + #======================b1-end=============== diff --git a/tests/script/sh/deploy.sh b/tests/script/sh/deploy.sh index 1bbfccf98946c9f16cc06e285255f92eeff504bf..38b6d9aadbbfbc8ad3e0cd7c37f56961e4f3f25c 100755 --- a/tests/script/sh/deploy.sh +++ b/tests/script/sh/deploy.sh @@ -5,18 +5,12 @@ set +e echo "Executing deploy.sh" -if [ $# != 4 ]; then - echo "argument list need input : " - echo " -n nodeName" - echo " -i nodePort" - exit 1 -fi - UNAME_BIN=`which uname` OS_TYPE=`$UNAME_BIN` NODE_NAME= NODE= -while getopts "n:i:" arg +MULTIPROCESS=0 +while getopts "n:i:m" arg do case $arg in n) @@ -25,6 +19,9 @@ do i) NODE=$OPTARG ;; + m) + MULTIPROCESS=1 + ;; ?) echo "unkonw argument" ;; @@ -145,5 +142,5 @@ echo "statusInterval 1" >> $TAOS_CFG echo "asyncLog 0" >> $TAOS_CFG echo "locale en_US.UTF-8" >> $TAOS_CFG echo "telemetryReporting 0" >> $TAOS_CFG -echo "multiProcess 0" >> $TAOS_CFG +echo "multiProcess ${MULTIPROCESS}" >> $TAOS_CFG echo " " >> $TAOS_CFG diff --git a/tests/script/test.sh b/tests/script/test.sh index f89b9fb1a2fa79a4582e764b4ea64d9b03a099d5..8b77575e1845d4bff7b62d6b5351263912a34a5d 100755 --- a/tests/script/test.sh +++ b/tests/script/test.sh @@ -7,7 +7,6 @@ ################################################## set +e -#set -x FILE_NAME= RELEASE=0 @@ -16,7 +15,8 @@ VALGRIND=0 UNIQUE=0 UNAME_BIN=`which uname` OS_TYPE=`$UNAME_BIN` -while getopts "f:avu" arg +MULTIPROCESS=0 +while getopts "f:avum" arg do case $arg in f) @@ -28,6 +28,9 @@ do u) UNIQUE=1 ;; + m) + MULTIPROCESS=1 + ;; ?) echo "unknow argument" ;; @@ -125,8 +128,13 @@ if [ -n "$FILE_NAME" ]; then echo valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${CODE_DIR}/../script/valgrind.log $PROGRAM -c $CFG_DIR -f $FILE_NAME valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${CODE_DIR}/../script/valgrind.log $PROGRAM -c $CFG_DIR -f $FILE_NAME else - echo "ExcuteCmd:" $PROGRAM -c $CFG_DIR -f $FILE_NAME - $PROGRAM -c $CFG_DIR -f $FILE_NAME + if [[ $MULTIPROCESS -eq 1 ]];then + echo "ExcuteCmd(multiprocess):" $PROGRAM -m -c $CFG_DIR -f $FILE_NAME + $PROGRAM -m -c $CFG_DIR -f $FILE_NAME + else + echo "ExcuteCmd(singleprocess):" $PROGRAM -c $CFG_DIR -f $FILE_NAME + $PROGRAM -c $CFG_DIR -f $FILE_NAME + fi fi else echo "ExcuteCmd:" $PROGRAM -c $CFG_DIR -f basicSuite.sim diff --git a/tests/tsim/inc/simInt.h b/tests/tsim/inc/simInt.h index 1e2190e3086af93e0e1dccd4bc1fa02a65c48635..a667139de1934fab479c8ed2c3d857b7ee111dea 100644 --- a/tests/tsim/inc/simInt.h +++ b/tests/tsim/inc/simInt.h @@ -155,6 +155,7 @@ extern int32_t simScriptSucced; extern int32_t simDebugFlag; extern char simScriptDir[]; extern bool abortExecution; +extern bool useMultiProcess; SScript *simParseScript(char *fileName); SScript *simProcessCallOver(SScript *script); diff --git a/tests/tsim/src/simExe.c b/tests/tsim/src/simExe.c index 855705e904060f1646799b54d0f670da2fb36dd7..9a0b48197a9d38a8da7aafe9952ab7a87ddd51d8 100644 --- a/tests/tsim/src/simExe.c +++ b/tests/tsim/src/simExe.c @@ -305,25 +305,24 @@ bool simExecuteRunBackCmd(SScript *script, char *option) { return true; } -void simReplaceShToBat(char *dst) { - char *sh = strstr(dst, ".sh"); - if (sh != NULL) { +void simReplaceStr(char *buf, char *src, char *dst) { + char *begin = strstr(buf, src); + if (begin != NULL) { + int32_t srcLen = (int32_t)strlen(src); int32_t dstLen = (int32_t)strlen(dst); - char *end = dst + dstLen; - *(end + 1) = 0; + int32_t interval = (dstLen - srcLen); + int32_t remainLen = (int32_t)strlen(buf); + char *end = buf + remainLen; + *(end + interval) = 0; - for (char *p = end; p >= sh; p--) { - *(p + 1) = *p; + for (char *p = end; p >= begin; p--) { + *(p + interval) = *p; } - sh[0] = '.'; - sh[1] = 'b'; - sh[2] = 'a'; - sh[3] = 't'; - sh[4] = ' '; + memcpy(begin, dst, dstLen); } - simDebug("system cmd is %s", dst); + simInfo("system cmd is %s", buf); } bool simExecuteSystemCmd(SScript *script, char *option) { @@ -334,9 +333,13 @@ bool simExecuteSystemCmd(SScript *script, char *option) { simVisuallizeOption(script, option, buf + strlen(buf)); #else sprintf(buf, "%s%s", simScriptDir, option); - simReplaceShToBat(buf); + simReplaceStr(buf, ".sh", ".bat"); #endif + if (useMultiProcess) { + simReplaceStr(buf, "deploy.sh", "deploy.sh -m"); + } + simLogSql(buf, true); int32_t code = system(buf); int32_t repeatTimes = 0; diff --git a/tests/tsim/src/simMain.c b/tests/tsim/src/simMain.c index b57299e7524cb785e07bc7de8ded8302e32fffc8..8898f1b201da5e41db81982a8261a2d792888912 100644 --- a/tests/tsim/src/simMain.c +++ b/tests/tsim/src/simMain.c @@ -18,6 +18,7 @@ bool simExecSuccess = false; bool abortExecution = false; +bool useMultiProcess = false; void simHandleSignal(int32_t signo, void *sigInfo, void *context) { simSystemCleanUp(); @@ -32,6 +33,8 @@ int32_t main(int32_t argc, char *argv[]) { tstrncpy(configDir, argv[++i], 128); } else if (strcmp(argv[i], "-f") == 0 && i < argc - 1) { strcpy(scriptFile, argv[++i]); + } else if (strcmp(argv[i], "-m") == 0) { + useMultiProcess = true; } else { printf("usage: %s [options] \n", argv[0]); printf(" [-c config]: config directory, default is: %s\n", configDir);