From 359482be730c6d605a2a4f1012564edde8bf2b73 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 15 Mar 2022 14:03:30 +0800 Subject: [PATCH] shm --- include/os/osEnv.h | 1 + source/dnode/mgmt/container/src/dndInt.c | 3 +++ source/dnode/mgmt/container/src/dndNode.c | 11 +++++---- .../dnode/mgmt/container/src/dndTransport.c | 13 +++++----- source/dnode/mgmt/dnode/src/dmFile.c | 24 +++++++++---------- source/dnode/mgmt/dnode/src/dmInt.c | 4 ++-- source/dnode/mgmt/dnode/src/dmMsg.c | 8 +++---- source/dnode/mgmt/dnode/src/dmWorker.c | 19 +++++++++------ source/dnode/mgmt/main/src/dndMain.c | 1 + source/os/src/osEnv.c | 2 ++ 10 files changed, 51 insertions(+), 35 deletions(-) diff --git a/include/os/osEnv.h b/include/os/osEnv.h index 1426ba87f6..ebf4c360dd 100644 --- a/include/os/osEnv.h +++ b/include/os/osEnv.h @@ -45,6 +45,7 @@ extern SDiskSpace tsTempSpace; void osInit(); void osUpdate(); +void osCleanup(); bool osLogSpaceAvailable(); void osSetTimezone(const char *timezone); diff --git a/source/dnode/mgmt/container/src/dndInt.c b/source/dnode/mgmt/container/src/dndInt.c index 17757a75d5..1306c862bc 100644 --- a/source/dnode/mgmt/container/src/dndInt.c +++ b/source/dnode/mgmt/container/src/dndInt.c @@ -19,6 +19,7 @@ static int8_t once = DND_ENV_INIT; int32_t dndInit() { + dDebug("start to init dnode env"); if (atomic_val_compare_exchange_8(&once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) { terrno = TSDB_CODE_REPEAT_INIT; dError("failed to init dnode env since %s", terrstr()); @@ -51,6 +52,7 @@ int32_t dndInit() { } void dndCleanup() { + dDebug("start to cleanup dnode env"); if (atomic_val_compare_exchange_8(&once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) { dError("dnode env is already cleaned up"); return; @@ -121,5 +123,6 @@ TdFilePtr dndCheckRunning(char *dataDir) { return NULL; } + dDebug("file:%s is locked", filepath); return pFile; } diff --git a/source/dnode/mgmt/container/src/dndNode.c b/source/dnode/mgmt/container/src/dndNode.c index 6b41f4b5d2..db8785662d 100644 --- a/source/dnode/mgmt/container/src/dndNode.c +++ b/source/dnode/mgmt/container/src/dndNode.c @@ -45,7 +45,11 @@ static bool dndRequireNode(SMgmtWrapper *pMgmt) { static int32_t dndOpenNode(SMgmtWrapper *pWrapper) { return (*pWrapper->fp.openFp)(pWrapper); } -static void dndCloseNode(SMgmtWrapper *pWrapper) { (*pWrapper->fp.closeFp)(pWrapper); } +static void dndCloseNode(SMgmtWrapper *pWrapper) { + if (pWrapper->required) { + (*pWrapper->fp.closeFp)(pWrapper); + } +} static void dndClearMemory(SDnode *pDnode) { for (ENodeType n = 0; n < NODE_MAX; ++n) { @@ -243,7 +247,7 @@ int32_t dndRun(SDnode *pDnode) { while (1) { if (pDnode->event == DND_EVENT_STOP) { - dInfo("dnode object receive stop event"); + dInfo("dnode is about to stop"); break; } taosMsleep(100); @@ -302,8 +306,7 @@ void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) { goto _OVER; } - dTrace("msg:%p, is created, app:%p RPC:%p user:%s, processd by %s", pMsg, pRpc->ahandle, pRpc->handle, pMsg->user, - pWrapper->name); + dTrace("msg:%p, is created, user:%s", pMsg, pMsg->user); code = (*msgFp)(pWrapper, pMsg); _OVER: diff --git a/source/dnode/mgmt/container/src/dndTransport.c b/source/dnode/mgmt/container/src/dndTransport.c index 81b6818fbc..b6db12d62d 100644 --- a/source/dnode/mgmt/container/src/dndTransport.c +++ b/source/dnode/mgmt/container/src/dndTransport.c @@ -30,18 +30,18 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { if (dndGetStatus(pDnode) == DND_STAT_STOPPED) { if (pRsp == NULL || pRsp->pCont == NULL) return; - dTrace("RPC %p, rsp:%s ignored since dnode exiting, app:%p", pRsp->handle, TMSG_INFO(msgType), pRsp->ahandle); + dTrace("rsp:%s ignored since dnode exiting, app:%p", TMSG_INFO(msgType), pRsp->ahandle); rpcFreeCont(pRsp->pCont); return; } SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)]; if (pHandle->msgFp != NULL) { - dTrace("RPC %p, rsp:%s will be processed by %s, code:0x%x app:%p", pRsp->handle, TMSG_INFO(msgType), - pHandle->pWrapper->name, pRsp->code & 0XFFFF, pRsp->ahandle); + dTrace("rsp:%s will be processed by %s, code:0x%x app:%p", TMSG_INFO(msgType), pHandle->pWrapper->name, + pRsp->code & 0XFFFF, pRsp->ahandle); dndProcessRpcMsg(pHandle->pWrapper, pRsp, pEpSet); } else { - dError("RPC %p, rsp:%s not processed, app:%p", pRsp->handle, TMSG_INFO(msgType), pRsp->ahandle); + dError("rsp:%s not processed, app:%p", TMSG_INFO(msgType), pRsp->ahandle); rpcFreeCont(pRsp->pCont); } } @@ -258,12 +258,13 @@ int32_t dndInitMsgHandle(SDnode *pDnode) { SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex]; if (pHandle->msgFp != NULL) { - dError("msg:%s, has multiple process nodes, prev node:%s, curr node:%s", tMsgInfo[msgIndex], + dError("msg:%s has multiple process nodes, prev node:%s, curr node:%s", tMsgInfo[msgIndex], pHandle->pWrapper->name, pWrapper->name); return -1; } else { - dDebug("msg:%s, will be processed by node:%s", tMsgInfo[msgIndex], pWrapper->name); + dTrace("msg:%s will be processed by %s", tMsgInfo[msgIndex], pWrapper->name); pHandle->msgFp = msgFp; + pHandle->pWrapper = pWrapper; } } } diff --git a/source/dnode/mgmt/dnode/src/dmFile.c b/source/dnode/mgmt/dnode/src/dmFile.c index 43bbcfd946..0794824ac6 100644 --- a/source/dnode/mgmt/dnode/src/dmFile.c +++ b/source/dnode/mgmt/dnode/src/dmFile.c @@ -16,9 +16,9 @@ #define _DEFAULT_SOURCE #include "dmFile.h" -static void dndPrintDnodes(SDnodeMgmt *pMgmt); -static bool dndIsEpChanged(SDnodeMgmt *pMgmt, const char *ep); -static void dndResetDnodes(SDnodeMgmt *pMgmt, SArray *pDnodeEps); +static void dmPrintDnodes(SDnodeMgmt *pMgmt); +static bool dmIsEpChanged(SDnodeMgmt *pMgmt, const char *ep); +static void dmResetDnodes(SDnodeMgmt *pMgmt, SArray *pDnodeEps); int32_t dmReadFile(SDnodeMgmt *pMgmt) { int32_t code = TSDB_CODE_DND_DNODE_READ_FILE_ERROR; @@ -130,14 +130,14 @@ int32_t dmReadFile(SDnodeMgmt *pMgmt) { code = 0; dInfo("succcessed to read file %s", file); - dndPrintDnodes(pMgmt); + dmPrintDnodes(pMgmt); PRASE_DNODE_OVER: if (content != NULL) free(content); if (root != NULL) cJSON_Delete(root); if (pFile != NULL) taosCloseFile(&pFile); - if (dndIsEpChanged(pMgmt, pMgmt->pDnode->cfg.localEp)) { + if (dmIsEpChanged(pMgmt, pMgmt->pDnode->cfg.localEp)) { dError("localEp %s different with %s and need reconfigured", pMgmt->pDnode->cfg.localEp, file); return -1; } @@ -149,7 +149,7 @@ PRASE_DNODE_OVER: taosArrayPush(pMgmt->pDnodeEps, &dnodeEp); } - dndResetDnodes(pMgmt, pMgmt->pDnodeEps); + dmResetDnodes(pMgmt, pMgmt->pDnodeEps); terrno = 0; return 0; @@ -218,12 +218,12 @@ void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps) { int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps); if (numOfEps != numOfEpsOld) { - dndResetDnodes(pMgmt, pDnodeEps); + dmResetDnodes(pMgmt, pDnodeEps); dmWriteFile(pMgmt); } else { int32_t size = numOfEps * sizeof(SDnodeEp); if (memcmp(pMgmt->pDnodeEps->pData, pDnodeEps->pData, size) != 0) { - dndResetDnodes(pMgmt, pDnodeEps); + dmResetDnodes(pMgmt, pDnodeEps); dmWriteFile(pMgmt); } } @@ -231,7 +231,7 @@ void dmUpdateDnodeEps(SDnodeMgmt *pMgmt, SArray *pDnodeEps) { taosWUnLockLatch(&pMgmt->latch); } -static void dndResetDnodes(SDnodeMgmt *pMgmt, SArray *pDnodeEps) { +static void dmResetDnodes(SDnodeMgmt *pMgmt, SArray *pDnodeEps) { if (pMgmt->pDnodeEps != pDnodeEps) { SArray *tmp = pMgmt->pDnodeEps; pMgmt->pDnodeEps = taosArrayDup(pDnodeEps); @@ -259,10 +259,10 @@ static void dndResetDnodes(SDnodeMgmt *pMgmt, SArray *pDnodeEps) { taosHashPut(pMgmt->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp)); } - dndPrintDnodes(pMgmt); + dmPrintDnodes(pMgmt); } -static void dndPrintDnodes(SDnodeMgmt *pMgmt) { +static void dmPrintDnodes(SDnodeMgmt *pMgmt) { int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps); dDebug("print dnode ep list, num:%d", numOfEps); for (int32_t i = 0; i < numOfEps; i++) { @@ -271,7 +271,7 @@ static void dndPrintDnodes(SDnodeMgmt *pMgmt) { } } -static bool dndIsEpChanged(SDnodeMgmt *pMgmt, const char *ep) { +static bool dmIsEpChanged(SDnodeMgmt *pMgmt, const char *ep) { bool changed = false; taosRLockLatch(&pMgmt->latch); diff --git a/source/dnode/mgmt/dnode/src/dmInt.c b/source/dnode/mgmt/dnode/src/dmInt.c index 092280345e..27d4c5a3f9 100644 --- a/source/dnode/mgmt/dnode/src/dmInt.c +++ b/source/dnode/mgmt/dnode/src/dmInt.c @@ -141,11 +141,11 @@ int32_t dmInit(SMgmtWrapper *pWrapper) { } pWrapper->pMgmt = pMgmt; + dInfo("dnode-mgmt is initialized"); + dndSetStatus(pDnode, DND_STAT_RUNNING); dmSendStatusReq(pMgmt); dndReportStartup(pDnode, "TDengine", "initialized successfully"); - - dInfo("dnode-mgmt is initialized"); return 0; } diff --git a/source/dnode/mgmt/dnode/src/dmMsg.c b/source/dnode/mgmt/dnode/src/dmMsg.c index d7b75545aa..be70a42101 100644 --- a/source/dnode/mgmt/dnode/src/dmMsg.c +++ b/source/dnode/mgmt/dnode/src/dmMsg.c @@ -54,11 +54,11 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527}; pMgmt->statusSent = 1; - dTrace("pDnode:%p, send status req to mnode", pDnode); + dTrace("send status req to mnode, ahandle:%p", rpcMsg.ahandle); dndSendReqToMnode(pMgmt->pDnode, &rpcMsg); } -static void dndUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { +static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { if (pMgmt->dnodeId == 0) { dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId); taosWLockLatch(&pMgmt->latch); @@ -70,7 +70,7 @@ static void dndUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { } void dmProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) { - SDnodeMgmt *pMgmt = dndGetWrapper(pDnode, MNODE)->pMgmt; + SDnodeMgmt *pMgmt = dndGetWrapper(pDnode, DNODE)->pMgmt; if (pRsp->code != TSDB_CODE_SUCCESS) { if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->dropped && pMgmt->dnodeId > 0) { @@ -83,7 +83,7 @@ void dmProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) { if (pRsp->pCont != NULL && pRsp->contLen != 0 && tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) { pMgmt->dver = statusRsp.dver; - dndUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg); + dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg); dmUpdateDnodeEps(pMgmt, statusRsp.pDnodeEps); } taosArrayDestroy(statusRsp.pDnodeEps); diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index 80acec2262..7747efe604 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -23,7 +23,7 @@ #include "smInt.h" #include "vmInt.h" -static void *dnodeThreadRoutine(void *param) { +static void *dmThreadRoutine(void *param) { SDnodeMgmt *pMgmt = param; SDnode *pDnode = pMgmt->pDnode; int64_t lastStatusTime = taosGetTimestampMs(); @@ -54,8 +54,10 @@ static void *dnodeThreadRoutine(void *param) { } } -static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { - int32_t code = 0; +static void dmProcessMgmtQueue(SDnode *pDnode, SNodeMsg *pNodeMsg) { + int32_t code = 0; + SRpcMsg *pMsg = &pNodeMsg->rpcMsg; + dTrace("msg:%p, will be processed in mgmt queue", pNodeMsg); switch (pMsg->msgType) { case TDMT_DND_CREATE_MNODE: @@ -127,21 +129,23 @@ static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { rpcFreeCont(pMsg->pCont); pMsg->pCont = NULL; - taosFreeQitem(pMsg); + taosFreeQitem(pNodeMsg); } int32_t dmStartWorker(SDnodeMgmt *pMgmt) { - if (dndInitWorker(pMgmt->pDnode, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "dnode-mgmt", 1, 1, dndProcessMgmtQueue) != 0) { + if (dndInitWorker(pMgmt->pDnode, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "dnode-mgmt", 1, 1, dmProcessMgmtQueue) != + 0) { dError("failed to start dnode mgmt worker since %s", terrstr()); return -1; } - if (dndInitWorker(pMgmt->pDnode, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1, dndProcessMgmtQueue) != 0) { + if (dndInitWorker(pMgmt->pDnode, &pMgmt->statusWorker, DND_WORKER_SINGLE, "dnode-status", 1, 1, + dmProcessMgmtQueue) != 0) { dError("failed to start dnode mgmt worker since %s", terrstr()); return -1; } - pMgmt->threadId = taosCreateThread(dnodeThreadRoutine, pMgmt); + pMgmt->threadId = taosCreateThread(dmThreadRoutine, pMgmt); if (pMgmt->threadId == NULL) { dError("failed to init dnode thread"); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -169,5 +173,6 @@ int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { pWorker = &pMgmt->statusWorker; } + dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name); return dndWriteMsgToWorker(pWorker, pMsg, sizeof(SNodeMsg)); } \ No newline at end of file diff --git a/source/dnode/mgmt/main/src/dndMain.c b/source/dnode/mgmt/main/src/dndMain.c index 9351f60e3d..370e9aa7b5 100644 --- a/source/dnode/mgmt/main/src/dndMain.c +++ b/source/dnode/mgmt/main/src/dndMain.c @@ -85,6 +85,7 @@ static int32_t dndRunDnode() { int32_t code = dndRun(pDnode); dInfo("start shutting down the TDengine service"); + global.pDnode = NULL; dndClose(pDnode); dndCleanup(); taosCloseLog(); diff --git a/source/os/src/osEnv.c b/source/os/src/osEnv.c index 63fa600217..61b2593bc6 100644 --- a/source/os/src/osEnv.c +++ b/source/os/src/osEnv.c @@ -101,6 +101,8 @@ void osUpdate() { } } +void osCleanup() {} + bool osLogSpaceAvailable() { return tsLogSpace.reserved <= tsLogSpace.size.avail; } void osSetTimezone(const char *timezone) { taosSetSystemTimezone(tsTimezone, tsTimezone, &tsDaylight); } -- GitLab