diff --git a/source/dnode/mgmt/implement/inc/dndImp.h b/source/dnode/mgmt/implement/inc/dndImp.h index 31b2f0a53a379ffe65b0971ecb4a0628f1f73dde..804cc43d569ddcca968e2213c74700d1c6b04712 100644 --- a/source/dnode/mgmt/implement/inc/dndImp.h +++ b/source/dnode/mgmt/implement/inc/dndImp.h @@ -26,12 +26,13 @@ int32_t dndOpenNode(SMgmtWrapper *pWrapper); void dndCloseNode(SMgmtWrapper *pWrapper); // dndTransport.c -int32_t dndInitTrans(SDnode *pDnode); +int32_t dmInitTrans(SDnode *pDnode); void dndCleanupTrans(SDnode *pDnode); SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper); int32_t dndInitMsgHandle(SDnode *pDnode); -void dndSendMsgToMnode(SDnode *pDnode, SRpcMsg *pReq); +int32_t dndSendMsgToMnode(SDnode *pDnode, SRpcMsg *pReq); void dndSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); +void dmSendToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp); // mgmt void dmSetMgmtFp(SMgmtWrapper *pWrapper); @@ -54,12 +55,11 @@ int32_t dmWriteFile(SDnodeData *pMgmt); void dmUpdateDnodeEps(SDnodeData *pMgmt, SArray *pDnodeEps); // dmHandle.c -void dmInitMsgHandle(SMgmtWrapper *pWrapper); -void dmSendStatusReq(SDnodeData *pMgmt); -int32_t dmProcessConfigReq(SDnodeData *pMgmt, SNodeMsg *pMsg); -int32_t dmProcessStatusRsp(SDnodeData *pMgmt, SNodeMsg *pMsg); -int32_t dmProcessAuthRsp(SDnodeData *pMgmt, SNodeMsg *pMsg); -int32_t dmProcessGrantRsp(SDnodeData *pMgmt, SNodeMsg *pMsg); +void dmSendStatusReq(SDnode *pDnode); +int32_t dmProcessConfigReq(SDnode *pDnode, SNodeMsg *pMsg); +int32_t dmProcessStatusRsp(SDnode *pDnode, SNodeMsg *pMsg); +int32_t dmProcessAuthRsp(SDnode *pDnode, SNodeMsg *pMsg); +int32_t dmProcessGrantRsp(SDnode *pDnode, SNodeMsg *pMsg); int32_t dmProcessCDnodeReq(SDnode *pDnode, SNodeMsg *pMsg); // dmMonitor.c @@ -67,11 +67,15 @@ void dmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo); void dmSendMonitorReport(SDnode *pDnode); // dmWorker.c -int32_t dmStartThread(SDnodeData *pMgmt); -int32_t dmStartWorker(SDnodeData *pMgmt); -void dmStopWorker(SDnodeData *pMgmt); +int32_t dmStartStatusThread(SDnode *pDnode); +void dmStopStatusThread(SDnode *pDnode); +int32_t dmStartMonitorThread(SDnode *pDnode); +void dmStopMonitorThread(SDnode *pDnode); + +int32_t dmStartWorker(SDnode *pDnode); +void dmStopWorker(SDnode *pDnode); int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -int32_t dmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t dmProcessStatusMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/implement/src/dndExec.c b/source/dnode/mgmt/implement/src/dndExec.c index c30e730d8a30792a30ce76e93f0f86157d4fb7f6..6e49c588c6589d4978f31fa32c33c7ef3aa728d7 100644 --- a/source/dnode/mgmt/implement/src/dndExec.c +++ b/source/dnode/mgmt/implement/src/dndExec.c @@ -299,7 +299,7 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) { return -1; } - SMsgCb msgCb = dndCreateMsgcb(pWrapper); + SMsgCb msgCb = dmGetMsgcb(pWrapper); tmsgSetDefaultMsgCb(&msgCb); pWrapper->procType = DND_PROC_CHILD; diff --git a/source/dnode/mgmt/implement/src/dndHandle.c b/source/dnode/mgmt/implement/src/dndHandle.c index 196671c9168700708d25950beb3974e068011efd..f8e3cc5c0cc8ffd32c58ba10c52ff22e507ea36e 100644 --- a/source/dnode/mgmt/implement/src/dndHandle.c +++ b/source/dnode/mgmt/implement/src/dndHandle.c @@ -16,11 +16,33 @@ #define _DEFAULT_SOURCE #include "dndImp.h" -void dmSendStatusReq(SDnodeData *pMgmt) { - SDnode *pDnode = pMgmt->pDnode; +static int32_t dmProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) { + SDnode *pDnode = pMgmt->pDnode; + + if (pRsp->code != TSDB_CODE_SUCCESS) { + if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pDnode->data.dropped && pDnode->data.dnodeId > 0) { + dInfo("dnode:%d, set to dropped since not exist in mnode", pDnode->data.dnodeId); + pDnode->data.dropped = 1; + dmWriteFile(pMgmt); + } + } else { + SStatusRsp statusRsp = {0}; + if (pRsp->pCont != NULL && pRsp->contLen != 0 && + tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) { + pMgmt->dnodeVer = statusRsp.dnodeVer; + dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg); + dmUpdateDnodeEps(pMgmt, statusRsp.pDnodeEps); + } + tFreeSStatusRsp(&statusRsp); + } + + return TSDB_CODE_SUCCESS; +} + +void dmSendStatusReq(SDnode *pDnode) { SStatusReq req = {0}; - taosRLockLatch(&pMgmt->latch); + taosRLockLatch(&pDnode->data.latch); req.sver = tsVersion; req.dnodeVer = pMgmt->dnodeVer; req.dnodeId = pDnode->data.dnodeId; @@ -38,7 +60,7 @@ void dmSendStatusReq(SDnodeData *pMgmt) { memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN); memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN); memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN); - taosRUnLockLatch(&pMgmt->latch); + taosRUnLockLatch(&pDnode->data.latch); SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, VNODE); if (pWrapper != NULL) { @@ -54,10 +76,11 @@ void dmSendStatusReq(SDnodeData *pMgmt) { taosArrayDestroy(req.pVloads); SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)0x9527}; - pMgmt->statusSent = 1; - + SRpcMsg rspMsg = {0}; + dTrace("send req:%s to mnode, app:%p", TMSG_INFO(rpcMsg.msgType), rpcMsg.ahandle); - dndSendMsgToMnode(pDnode, &rpcMsg); + dmSendToMnodeRecv(pDnode, rpcMsg, &rpcRsp); + dmProcessStatusRsp(pDnode, &rpcRsp); } static void dmUpdateDnodeCfg(SDnodeData *pMgmt, SDnodeCfg *pCfg) { @@ -65,37 +88,12 @@ static void dmUpdateDnodeCfg(SDnodeData *pMgmt, SDnodeCfg *pCfg) { if (pDnode->data.dnodeId == 0) { dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId); - taosWLockLatch(&pMgmt->latch); + taosWLockLatch(&pDnode->data.latch); pDnode->data.dnodeId = pCfg->dnodeId; pDnode->data.clusterId = pCfg->clusterId; dmWriteFile(pMgmt); - taosWUnLockLatch(&pMgmt->latch); - } -} - -int32_t dmProcessStatusRsp(SDnodeData *pMgmt, SNodeMsg *pMsg) { - SDnode *pDnode = pMgmt->pDnode; - SRpcMsg *pRsp = &pMsg->rpcMsg; - - if (pRsp->code != TSDB_CODE_SUCCESS) { - if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pDnode->data.dropped && pDnode->data.dnodeId > 0) { - dInfo("dnode:%d, set to dropped since not exist in mnode", pDnode->data.dnodeId); - pDnode->data.dropped = 1; - dmWriteFile(pMgmt); - } - } else { - SStatusRsp statusRsp = {0}; - if (pRsp->pCont != NULL && pRsp->contLen != 0 && - tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) { - pMgmt->dnodeVer = statusRsp.dnodeVer; - dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg); - dmUpdateDnodeEps(pMgmt, statusRsp.pDnodeEps); - } - tFreeSStatusRsp(&statusRsp); + taosWUnLockLatch(&pDnode->data.latch); } - - pMgmt->statusSent = 0; - return TSDB_CODE_SUCCESS; } int32_t dmProcessAuthRsp(SDnodeData *pMgmt, SNodeMsg *pMsg) { @@ -194,7 +192,7 @@ int32_t dmProcessCDnodeReq(SDnode *pDnode, SNodeMsg *pMsg) { } } -void dmInitMsgHandle(SMgmtWrapper *pWrapper) { +static void dmSetMsgHandle(SMgmtWrapper *pWrapper) { // Requests handled by DNODE dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, dmProcessMgmtMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, dmProcessMgmtMsg, DEFAULT_HANDLE); @@ -205,10 +203,84 @@ void dmInitMsgHandle(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, dmProcessMgmtMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, dmProcessMgmtMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, dmProcessMgmtMsg, DEFAULT_HANDLE); - dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, dmProcessMgmtMsg, DEFAULT_HANDLE); // Requests handled by MNODE - dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, dmProcessMonitorMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dmProcessMgmtMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dmProcessMgmtMsg, DEFAULT_HANDLE); } + +static int32_t dmStart(SMgmtWrapper *pWrapper) { return dmStartStatusThread(pWrapper->pDnode); } + +static void dmStop(SMgmtWrapper *pWrapper) { dmStopThread(pWrapper->pDnode); } + +static int32_t dmInit(SMgmtWrapper *pWrapper) { + dInfo("dnode-data start to init"); + SDnode *pDnode = pWrapper->pDnode; + + pDnode->data.dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + if (pDnode->data.dnodeHash == NULL) { + dError("failed to init dnode hash"); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + if (dmReadFile(pDnode) != 0) { + dError("failed to read file since %s", terrstr()); + return -1; + } + + if (pDnode->data.dropped) { + dError("dnode will not start since its already dropped"); + return -1; + } + + if (dmStartWorker(pDnode) != 0) { + return -1; + } + + if (dmInitTrans(pDnode) != 0) { + dError("failed to init transport since %s", terrstr()); + return -1; + } + + dInfo("dnode-data is initialized"); + return 0; +} + +static void dmCleanup(SMgmtWrapper *pWrapper) { + dInfo("dnode-data start to clean up"); + SDnode *pDnode = pWrapper->pDnode; + dmStopWorker(pDnode); + + taosWLockLatch(&pDnode->data.latch); + if (pMgmt->dnodeEps != NULL) { + taosArrayDestroy(pMgmt->dnodeEps); + pMgmt->dnodeEps = NULL; + } + if (pMgmt->dnodeHash != NULL) { + taosHashCleanup(pMgmt->dnodeHash); + pMgmt->dnodeHash = NULL; + } + taosWUnLockLatch(&pDnode->data.latch); + + dndCleanupTrans(pDnode); + dInfo("dnode-data is cleaned up"); +} + +static int32_t dmRequire(SMgmtWrapper *pWrapper, bool *required) { + *required = true; + return 0; +} + +void dmSetMgmtFp(SMgmtWrapper *pWrapper) { + SMgmtFp mgmtFp = {0}; + mgmtFp.openFp = dmInit; + mgmtFp.closeFp = dmCleanup; + mgmtFp.startFp = dmStart; + mgmtFp.stopFp = dmStop; + mgmtFp.requiredFp = dmRequire; + + dmSetMsgHandle(pWrapper); + pWrapper->name = "dnode"; + pWrapper->fp = mgmtFp; +} diff --git a/source/dnode/mgmt/implement/src/dndObj.c b/source/dnode/mgmt/implement/src/dndObj.c index bfbdb756c6fe984f2157f5975c7a322ea5e25153..8b20a446db6bd34c27f3af469e780c6799660207 100644 --- a/source/dnode/mgmt/implement/src/dndObj.c +++ b/source/dnode/mgmt/implement/src/dndObj.c @@ -17,103 +17,11 @@ #include "dndImp.h" -static int32_t dmStart(SMgmtWrapper *pWrapper) { - dDebug("dnode-mgmt start to run"); - return dmStartThread(pWrapper->pMgmt); -} - -static int32_t dmInit(SMgmtWrapper *pWrapper) { - SDnode *pDnode = pWrapper->pDnode; - SDnodeData *pMgmt = taosMemoryCalloc(1, sizeof(SDnodeData)); - dInfo("dnode-mgmt start to init"); - +static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { pDnode->data.dnodeId = 0; pDnode->data.dropped = 0; pDnode->data.clusterId = 0; - pMgmt->path = pWrapper->path; - pMgmt->pDnode = pDnode; - taosInitRWLatch(&pMgmt->latch); - - pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - if (pMgmt->dnodeHash == NULL) { - dError("failed to init dnode hash"); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - if (dmReadFile(pMgmt) != 0) { - dError("failed to read file since %s", terrstr()); - return -1; - } - - if (pDnode->data.dropped) { - dError("dnode will not start since its already dropped"); - return -1; - } - - if (dmStartWorker(pMgmt) != 0) { - return -1; - } - - if (dndInitTrans(pDnode) != 0) { - dError("failed to init transport since %s", terrstr()); - return -1; - } - - pWrapper->pMgmt = pMgmt; - pMgmt->msgCb = dndCreateMsgcb(pWrapper); - - dInfo("dnode-mgmt is initialized"); - return 0; -} - -static void dmCleanup(SMgmtWrapper *pWrapper) { - SDnodeData *pMgmt = pWrapper->pMgmt; - if (pMgmt == NULL) return; - - dInfo("dnode-mgmt start to clean up"); - SDnode *pDnode = pMgmt->pDnode; - dmStopWorker(pMgmt); - - taosWLockLatch(&pMgmt->latch); - - if (pMgmt->dnodeEps != NULL) { - taosArrayDestroy(pMgmt->dnodeEps); - pMgmt->dnodeEps = NULL; - } - - if (pMgmt->dnodeHash != NULL) { - taosHashCleanup(pMgmt->dnodeHash); - pMgmt->dnodeHash = NULL; - } - - taosWUnLockLatch(&pMgmt->latch); - - taosMemoryFree(pMgmt); - pWrapper->pMgmt = NULL; - dndCleanupTrans(pDnode); - - dInfo("dnode-mgmt is cleaned up"); -} - -static int32_t dmRequire(SMgmtWrapper *pWrapper, bool *required) { - *required = true; - return 0; -} - -void dmSetMgmtFp(SMgmtWrapper *pWrapper) { - SMgmtFp mgmtFp = {0}; - mgmtFp.openFp = dmInit; - mgmtFp.closeFp = dmCleanup; - mgmtFp.startFp = dmStart; - mgmtFp.requiredFp = dmRequire; - - dmInitMsgHandle(pWrapper); - pWrapper->name = "dnode"; - pWrapper->fp = mgmtFp; -} - -static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { + pDnode->data.supportVnodes = pOption->numOfSupportVnodes; pDnode->data.serverPort = pOption->serverPort; pDnode->data.dataDir = strdup(pOption->dataDir); @@ -139,6 +47,7 @@ static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { } } + taosInitRWLatch(&pDnode->data.latch); return 0; } @@ -212,7 +121,7 @@ SDnode *dndCreate(const SDnodeOpt *pOption) { goto _OVER; } - SMsgCb msgCb = dndCreateMsgcb(&pDnode->wrappers[0]); + SMsgCb msgCb = dmGetMsgcb(&pDnode->wrappers[0]); tmsgSetDefaultMsgCb(&msgCb); dInfo("dnode is created, data:%p", pDnode); @@ -245,5 +154,3 @@ void dndHandleEvent(SDnode *pDnode, EDndEvent event) { pDnode->event = event; } } - - diff --git a/source/dnode/mgmt/implement/src/dndTransport.c b/source/dnode/mgmt/implement/src/dndTransport.c index 7d9ba1d56d2ba2afe420bfa28c3d6b875bad00fa..bf330b0f7bd9b6a83ee01278894ec96e1ae69eab 100644 --- a/source/dnode/mgmt/implement/src/dndTransport.c +++ b/source/dnode/mgmt/implement/src/dndTransport.c @@ -247,13 +247,13 @@ void dndSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) { rpcSendRecv(pDnode->trans.clientRpc, pEpSet, pReq, pRsp); } -void dndSendMsgToMnode(SDnode *pDnode, SRpcMsg *pReq) { +int32_t dndSendMsgToMnode(SDnode *pDnode, SRpcMsg *pReq) { SEpSet epSet = {0}; dndGetMnodeEpSet(pDnode, &epSet); - dndSendRpcReq(pDnode, &epSet, pReq); + return dndSendRpcReq(pDnode, &epSet, pReq); } -static inline void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp) { +void dmSendToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp) { SEpSet epSet = {0}; dndGetMnodeEpSet(pDnode, &epSet); rpcSendRecv(pDnode->trans.clientRpc, &epSet, pReq, pRsp); @@ -453,7 +453,7 @@ static inline int32_t dndRetrieveUserAuthInfo(SDnode *pDnode, char *user, char * SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528}; SRpcMsg rpcRsp = {0}; dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, authReq.spi, authReq.encrypt); - dndSendMsgToMnodeRecv(pDnode, &rpcMsg, &rpcRsp); + dmSendToMnodeRecv(pDnode, &rpcMsg, &rpcRsp); if (rpcRsp.code != 0) { terrno = rpcRsp.code; @@ -506,7 +506,7 @@ static void dndCleanupServer(SDnode *pDnode) { } } -int32_t dndInitTrans(SDnode *pDnode) { +int32_t dmInitTrans(SDnode *pDnode) { if (dndInitServer(pDnode) != 0) return -1; if (dndInitClient(pDnode) != 0) return -1; diff --git a/source/dnode/mgmt/implement/src/dndWorker.c b/source/dnode/mgmt/implement/src/dndWorker.c index 1f42c8105ba816143ba12e4f887ab13734a52874..975150e2304c19b8070133636e8358eee48802cc 100644 --- a/source/dnode/mgmt/implement/src/dndWorker.c +++ b/source/dnode/mgmt/implement/src/dndWorker.c @@ -16,41 +16,78 @@ #define _DEFAULT_SOURCE #include "dndImp.h" -static void *dmThreadRoutine(void *param) { - SDnodeData *pMgmt = param; - SDnode *pDnode = pMgmt->pDnode; - int64_t lastStatusTime = taosGetTimestampMs(); - int64_t lastMonitorTime = lastStatusTime; +static void *dmStatusThreadFp(void *param) { + SDnode *pDnode = param; + int64_t lastTime = taosGetTimestampMs(); - setThreadName("dnode-hb"); + setThreadName("dnode-status"); - while (true) { + while (1) { taosThreadTestCancel(); taosMsleep(200); - if (dndGetStatus(pDnode) != DND_STAT_RUNNING || pDnode->data.dropped) { + + if (pDnode->status != DND_STAT_RUNNING || pDnode->data.dropped) { continue; } int64_t curTime = taosGetTimestampMs(); - float statusInterval = (curTime - lastStatusTime) / 1000.0f; - if (statusInterval >= tsStatusInterval && !pMgmt->statusSent) { + float interval = (curTime - lastTime) / 1000.0f; + if (interval >= tsStatusInterval) { dmSendStatusReq(pMgmt); - lastStatusTime = curTime; + lastTime = curTime; + } + } + + return NULL; +} + +static void *dmMonitorThreadFp(void *param) { + SDnode *pDnode = param; + int64_t lastTime = taosGetTimestampMs(); + + setThreadName("dnode-monitor"); + + while (1) { + taosThreadTestCancel(); + taosMsleep(200); + + if (pDnode->status != DND_STAT_RUNNING || pDnode->data.dropped) { + continue; } - float monitorInterval = (curTime - lastMonitorTime) / 1000.0f; - if (monitorInterval >= tsMonitorInterval) { + int64_t curTime = taosGetTimestampMs(); + float interval = (curTime - lastTime) / 1000.0f; + if (interval >= tsMonitorInterval) { dmSendMonitorReport(pDnode); - lastMonitorTime = curTime; + lastTime = curTime; } } - return TSDB_CODE_SUCCESS; + + return NULL; +} + +int32_t dmStartStatusThread(SDnode *pDnode) { + pDnode->statusThreadId = taosCreateThread(dmStatusThreadFp, pDnode); + if (pDnode->statusThreadId == NULL) { + dError("failed to init dnode status thread"); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + return 0; +} + +void dmStopStatusThread(SDnode *pDnode) { + if (pDnode->statusThreadId != NULL) { + taosDestoryThread(pDnode->statusThreadId); + pDnode->statusThreadId = NULL; + } } -int32_t dmStartThread(SDnodeData *pMgmt) { - pMgmt->threadId = taosCreateThread(dmThreadRoutine, pMgmt); - if (pMgmt->threadId == NULL) { - dError("failed to init dnode thread"); +int32_t dmStartMonitorThread(SDnode *pDnode) { + pDnode->monitorThreadId = taosCreateThread(dmMonitorThreadFp, pDnode); + if (pDnode->monitorThreadId == NULL) { + dError("failed to init dnode monitor thread"); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -58,21 +95,23 @@ int32_t dmStartThread(SDnodeData *pMgmt) { return 0; } -static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { - SDnodeData *pMgmt = pInfo->ahandle; +void dmStopMonitorThread(SDnode *pDnode) { + if (pMgmt->monitorThreadId != NULL) { + taosDestoryThread(pMgmt->monitorThreadId); + pMgmt->monitorThreadId = NULL; + } +} - SDnode *pDnode = pMgmt->pDnode; +static void dmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SDnode *pDnode = pInfo->ahandle; SRpcMsg *pRpc = &pMsg->rpcMsg; int32_t code = -1; - dTrace("msg:%p, will be processed in dnode queue", pMsg); + dTrace("msg:%p, will be processed in dnode-mgmt queue", pMsg); switch (pRpc->msgType) { case TDMT_DND_CONFIG_DNODE: code = dmProcessConfigReq(pMgmt, pMsg); break; - case TDMT_MND_STATUS_RSP: - code = dmProcessStatusRsp(pMgmt, pMsg); - break; case TDMT_MND_AUTH_RSP: code = dmProcessAuthRsp(pMgmt, pMsg); break; @@ -96,15 +135,9 @@ static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { } int32_t dmStartWorker(SDnodeData *pMgmt) { - SSingleWorkerCfg mcfg = {.min = 1, .max = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt}; - if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mcfg) != 0) { - dError("failed to start dnode mgmt worker since %s", terrstr()); - return -1; - } - - SSingleWorkerCfg scfg = {.min = 1, .max = 1, .name = "dnode-monitor", .fp = (FItem)dmProcessQueue, .param = pMgmt}; - if (tSingleWorkerInit(&pMgmt->monitorWorker, &scfg) != 0) { - dError("failed to start dnode monitor worker since %s", terrstr()); + SSingleWorkerCfg cfg = {.min = 1, .max = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessMgmtQueue, .param = pMgmt}; + if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) { + dError("failed to start dnode-mgmt worker since %s", terrstr()); return -1; } @@ -114,12 +147,6 @@ int32_t dmStartWorker(SDnodeData *pMgmt) { void dmStopWorker(SDnodeData *pMgmt) { tSingleWorkerCleanup(&pMgmt->mgmtWorker); - tSingleWorkerCleanup(&pMgmt->monitorWorker); - - if (pMgmt->threadId != NULL) { - taosDestoryThread(pMgmt->threadId); - pMgmt->threadId = NULL; - } dDebug("dnode workers are closed"); } @@ -132,7 +159,7 @@ int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return 0; } -int32_t dmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { +int32_t dmProcessStatusMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SDnodeData *pMgmt = pWrapper->pMgmt; SSingleWorker *pWorker = &pMgmt->monitorWorker; diff --git a/source/dnode/mgmt/interface/inc/dndDef.h b/source/dnode/mgmt/interface/inc/dndDef.h index 0ce5a64ed0ad9b4a0daa28c7522194f4f4c81536..077b15d021013f7f1ec836b7a85810a96cefc00c 100644 --- a/source/dnode/mgmt/interface/inc/dndDef.h +++ b/source/dnode/mgmt/interface/inc/dndDef.h @@ -110,29 +110,26 @@ typedef struct { int64_t updateTime; int64_t rebootTime; bool dropped; - int8_t statusSent; SEpSet mnodeEpSet; SHashObj *dnodeHash; SArray *dnodeEps; - TdThread *threadId; + TdThread *statusThreadId; + TdThread *monitorThreadId; SRWLatch latch; SSingleWorker mgmtWorker; - SSingleWorker monitorWorker; SMsgCb msgCb; SDnode *pDnode; const char *path; TdFilePtr lockfile; - struct { - char *localEp; - char *localFqdn; - char *firstEp; - char *secondEp; - char *dataDir; - SDiskCfg *disks; - int32_t numOfDisks; - int32_t supportVnodes; - uint16_t serverPort; - }; + char *localEp; + char *localFqdn; + char *firstEp; + char *secondEp; + char *dataDir; + SDiskCfg *disks; + int32_t numOfDisks; + int32_t supportVnodes; + uint16_t serverPort; } SDnodeData; typedef struct SDnode { diff --git a/source/dnode/mgmt/interface/inc/dndInt.h b/source/dnode/mgmt/interface/inc/dndInt.h index f1583360375b94ac747c724561a30486ffaf096f..b178a64d35c53f8ddae3f8dca9315879c3b9e614 100644 --- a/source/dnode/mgmt/interface/inc/dndInt.h +++ b/source/dnode/mgmt/interface/inc/dndInt.h @@ -37,7 +37,7 @@ void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc); void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); void dndGetMonitorSysInfo(SMonSysInfo *pInfo); -SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper); +SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper); // dndFile.c int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed); diff --git a/source/dnode/mgmt/interface/src/dndInt.c b/source/dnode/mgmt/interface/src/dndInt.c index 52091ceb17c834629e79b8ed5b8780e98b85c1ba..88312202b6c1e597f492f593b9fa911b9495f3be 100644 --- a/source/dnode/mgmt/interface/src/dndInt.c +++ b/source/dnode/mgmt/interface/src/dndInt.c @@ -174,7 +174,7 @@ void dndGetMonitorSysInfo(SMonSysInfo *pInfo) { taosGetProcIODelta(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk); } -SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper) { +SMsgCb dmGetMsgcb(SMgmtWrapper *pWrapper) { SMsgCb msgCb = pWrapper->pDnode->data.msgCb; msgCb.pWrapper = pWrapper; return msgCb; diff --git a/source/dnode/mgmt/mgmt_bnode/src/bmInt.c b/source/dnode/mgmt/mgmt_bnode/src/bmInt.c index 71ba2bab8c3cc88dd0ce002c70c0a0cbb5467fa3..bca6e0432a8affa89d545124461e3479beba51f4 100644 --- a/source/dnode/mgmt/mgmt_bnode/src/bmInt.c +++ b/source/dnode/mgmt/mgmt_bnode/src/bmInt.c @@ -19,7 +19,7 @@ static int32_t bmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); } static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) { - SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper); + SMsgCb msgCb = dmGetMsgcb(pMgmt->pWrapper); pOption->msgCb = msgCb; } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index 374b33ed0156707f3e5b9924e5be8987a1531bec..2d8914df2581fc29abfea0a5d3a00b0eaeeec832 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -39,7 +39,7 @@ static int32_t mmRequire(SMgmtWrapper *pWrapper, bool *required) { } static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { - SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper); + SMsgCb msgCb = dmGetMsgcb(pMgmt->pWrapper); msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToQueryQueue; msgCb.queueFps[READ_QUEUE] = mmPutMsgToReadQueue; msgCb.queueFps[WRITE_QUEUE] = mmPutMsgToWriteQueue; diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmInt.c b/source/dnode/mgmt/mgmt_qnode/src/qmInt.c index cb06fe5d25a6df6f760c3ff41ff3dd88d71e802d..d211faaa3510be502fe0214a68104af9e661e050 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmInt.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmInt.c @@ -19,7 +19,7 @@ static int32_t qmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); } static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) { - SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper); + SMsgCb msgCb = dmGetMsgcb(pMgmt->pWrapper); msgCb.queueFps[QUERY_QUEUE] = qmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = qmPutMsgToFetchQueue; msgCb.qsizeFp = qmGetQueueSize; diff --git a/source/dnode/mgmt/mgmt_snode/src/smInt.c b/source/dnode/mgmt/mgmt_snode/src/smInt.c index 02958104a0eff1dc2be7921ae91b56231d1f47cf..7d3bfd6ab235a785db6e1f642cfc40af1172e2a5 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smInt.c +++ b/source/dnode/mgmt/mgmt_snode/src/smInt.c @@ -19,7 +19,7 @@ static int32_t smRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); } static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) { - SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper); + SMsgCb msgCb = dmGetMsgcb(pMgmt->pWrapper); pOption->msgCb = msgCb; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index f9e2b45d74a1a46afee85d6036f254f3d4af1f55..813bc9c5f7576d8738f480e5510b6c66eccb3ae1 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -143,7 +143,7 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return -1; } - SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper); + SMsgCb msgCb = dmGetMsgcb(pMgmt->pWrapper); msgCb.pWrapper = pMgmt->pWrapper; msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index b298b4c93a37e0720ce4d8da76d6205b5a2cc23b..94a6910ba3d73f214170439535f75a7a145b535d 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -128,7 +128,7 @@ static void *vmOpenVnodeFunc(void *param) { pMgmt->state.openVnodes, pMgmt->state.totalVnodes); dndReportStartup(pDnode, "open-vnodes", stepDesc); - SMsgCb msgCb = dndCreateMsgcb(pMgmt->pWrapper); + SMsgCb msgCb = dmGetMsgcb(pMgmt->pWrapper); msgCb.pWrapper = pMgmt->pWrapper; msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue; msgCb.queueFps[FETCH_QUEUE] = vmPutMsgToFetchQueue;