diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 8afe076b5f0811f1649922e9aac1753659c57458..6e649efeaf53a30b1864a25a582d39a6e4c60a0d 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -86,6 +86,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_INVALID_VERSION_STRING TAOS_DEF_ERROR_CODE(0, 0x0121) #define TSDB_CODE_VERSION_NOT_COMPATIBLE TAOS_DEF_ERROR_CODE(0, 0x0122) #define TSDB_CODE_COMPRESS_ERROR TAOS_DEF_ERROR_CODE(0, 0x0123) +#define TSDB_CODE_NODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0124) +#define TSDB_CODE_NODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0125) //client #define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) diff --git a/source/dnode/mgmt/container/inc/dnd.h b/source/dnode/mgmt/container/inc/dnd.h index 8acefc22b088667115a9b478715fc0ba3dba5374..050a437405886c16744102c43a85db142336407f 100644 --- a/source/dnode/mgmt/container/inc/dnd.h +++ b/source/dnode/mgmt/container/inc/dnd.h @@ -105,6 +105,7 @@ typedef struct SMgmtWrapper { const char *name; char *path; int32_t refCount; + SRWLatch latch; bool deployed; bool dropped; bool required; @@ -147,7 +148,7 @@ typedef struct SDnode { EDndStatus dndGetStatus(SDnode *pDnode); void dndSetStatus(SDnode *pDnode, EDndStatus stat); -SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType); +SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType); void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp); void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc); void dndSendMonitorReport(SDnode *pDnode); diff --git a/source/dnode/mgmt/container/inc/dndInt.h b/source/dnode/mgmt/container/inc/dndInt.h index 817cbfbc0094cb4b8e11b6ec5fc54c1225996f68..ca60b850bfb073122b3a5917eb76542a781f41f8 100644 --- a/source/dnode/mgmt/container/inc/dndInt.h +++ b/source/dnode/mgmt/container/inc/dndInt.h @@ -50,6 +50,9 @@ SDnode *dndCreate(const SDnodeOpt *pOption); void dndClose(SDnode *pDnode); void dndHandleEvent(SDnode *pDnode, EDndEvent event); +SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType); +void dndReleaseWrapper(SMgmtWrapper *pWrapper); + // dndTransport.c int32_t dndInitServer(SDnode *pDnode); void dndCleanupServer(SDnode *pDnode); diff --git a/source/dnode/mgmt/container/src/dndExec.c b/source/dnode/mgmt/container/src/dndExec.c index c861ca3da77b7ed7c1b0d754175f85669b58c68d..c2a2205d707262f35249eefd4432f9412f065b41 100644 --- a/source/dnode/mgmt/container/src/dndExec.c +++ b/source/dnode/mgmt/container/src/dndExec.c @@ -25,27 +25,40 @@ static void dndResetLog(SMgmtWrapper *pMgmt) { taosInitLog(logname, 1); } -static bool dndRequireNode(SMgmtWrapper *pMgmt) { - bool required = (*pMgmt->fp.requiredFp)(pMgmt); +static bool dndRequireNode(SMgmtWrapper *pWrapper) { + bool required = (*pWrapper->fp.requiredFp)(pWrapper); if (!required) { - dDebug("node:%s, no need to start", pMgmt->name); + dDebug("node:%s, no need to start", pWrapper->name); } else { - dDebug("node:%s, need to start", pMgmt->name); + dDebug("node:%s, need to start", pWrapper->name); } return required; } -int32_t dndOpenNode(SMgmtWrapper *pWrapper) { return (*pWrapper->fp.openFp)(pWrapper); } +int32_t dndOpenNode(SMgmtWrapper *pWrapper) { + int32_t code = (*pWrapper->fp.openFp)(pWrapper); + if (code != 0) { + dError("node:%s, failed to open since %s", pWrapper->name, terrstr()); + return -1; + } else { + dDebug("node:%s, has been opened", pWrapper->name); + } + + pWrapper->deployed = true; + return 0; +} void dndCloseNode(SMgmtWrapper *pWrapper) { - if (pWrapper->required) { + taosWLockLatch(&pWrapper->latch); + if (pWrapper->deployed) { (*pWrapper->fp.closeFp)(pWrapper); - pWrapper->required = false; + pWrapper->deployed = false; } if (pWrapper->pProc) { taosProcCleanup(pWrapper->pProc); pWrapper->pProc = NULL; } + taosWUnLockLatch(&pWrapper->latch); } static int32_t dndRunInSingleProcess(SDnode *pDnode) { @@ -70,13 +83,14 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) { } } - SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE); - if (dmStart(pWrapper->pMgmt) != 0) { + SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE); + int32_t code = dmStart(pWrapper->pMgmt); + if (code != 0) { dError("failed to start dnode worker since %s", terrstr()); - return -1; } - return 0; + dndReleaseWrapper(pWrapper); + return code; } static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) { @@ -184,12 +198,14 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) { } } - SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE); + SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE); if (pWrapper->procType == PROC_PARENT && dmStart(pWrapper->pMgmt) != 0) { + dndReleaseWrapper(pWrapper); dError("failed to start dnode worker since %s", terrstr()); return -1; } + dndReleaseWrapper(pWrapper); return 0; } diff --git a/source/dnode/mgmt/container/src/dndInt.c b/source/dnode/mgmt/container/src/dndInt.c index e9c52ef575866254a3a903098be4b88ddd9aff4b..4decc6ba87323533f651311b41aa0c0e58a95555 100644 --- a/source/dnode/mgmt/container/src/dndInt.c +++ b/source/dnode/mgmt/container/src/dndInt.c @@ -65,8 +65,6 @@ void dndCleanup() { dInfo("dnode env is cleaned up"); } -SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType) { return &pDnode->wrappers[nodeType]; } - void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) { pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp; } diff --git a/source/dnode/mgmt/container/src/dndMonitor.c b/source/dnode/mgmt/container/src/dndMonitor.c index beeee685dc74a9f67bb4a5d102d88e5c6fc6218d..7c47275c45a4fff9f31447fc74b4cfec127e5f55 100644 --- a/source/dnode/mgmt/container/src/dndMonitor.c +++ b/source/dnode/mgmt/container/src/dndMonitor.c @@ -22,7 +22,7 @@ static int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo) { tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name)); pInfo->tempdir.size = tsTempSpace.size; - return vmGetTfsMonitorInfo(dndGetWrapper(pDnode, VNODES), pInfo); + return vmGetTfsMonitorInfo(dndAcquireWrapper(pDnode, VNODES), pInfo); } static void dndGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) { @@ -45,8 +45,8 @@ static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) { taosGetCardInfo(&pInfo->net_in, &pInfo->net_out); taosGetProcIO(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk); - vmGetVnodeReqs(dndGetWrapper(pDnode, VNODES), pInfo); - pInfo->has_mnode = (dndGetWrapper(pDnode, MNODE)->required); + vmGetVnodeReqs(dndAcquireWrapper(pDnode, VNODES), pInfo); + pInfo->has_mnode = (dndAcquireWrapper(pDnode, MNODE)->required); } void dndSendMonitorReport(SDnode *pDnode) { @@ -63,7 +63,7 @@ void dndSendMonitorReport(SDnode *pDnode) { SMonClusterInfo clusterInfo = {0}; SMonVgroupInfo vgroupInfo = {0}; SMonGrantInfo grantInfo = {0}; - if (mmGetMonitorInfo(dndGetWrapper(pDnode, MNODE), &clusterInfo, &vgroupInfo, &grantInfo) == 0) { + if (mmGetMonitorInfo(dndAcquireWrapper(pDnode, MNODE), &clusterInfo, &vgroupInfo, &grantInfo) == 0) { monSetClusterInfo(pMonitor, &clusterInfo); monSetVgroupInfo(pMonitor, &vgroupInfo); monSetGrantInfo(pMonitor, &grantInfo); diff --git a/source/dnode/mgmt/container/src/dndMsg.c b/source/dnode/mgmt/container/src/dndMsg.c index 55f1ae2f80ba42e2244e0ae7f272e5c01750a53f..9497c9368db642df4678aac9fa975d6b10709981 100644 --- a/source/dnode/mgmt/container/src/dndMsg.c +++ b/source/dnode/mgmt/container/src/dndMsg.c @@ -16,8 +16,6 @@ #define _DEFAULT_SOURCE #include "dndInt.h" - - static int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc, SEpSet *pEpSet) { SRpcConnInfo connInfo = {0}; if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) { @@ -34,7 +32,7 @@ static int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc, SEpSet *pEpSet) { void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) { if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) { - dmUpdateMnodeEpSet(dndGetWrapper(pWrapper->pDnode, DNODE)->pMgmt, pEpSet); + dmUpdateMnodeEpSet(dndAcquireWrapper(pWrapper->pDnode, DNODE)->pMgmt, pEpSet); } int32_t code = -1; @@ -91,13 +89,13 @@ static SMgmtWrapper *dndGetWrapperFromMsg(SDnode *pDnode, SNodeMsg *pMsg) { SMgmtWrapper *pWrapper = NULL; switch (pMsg->rpcMsg.msgType) { case TDMT_DND_CREATE_MNODE: - return dndGetWrapper(pDnode, MNODE); + return dndAcquireWrapper(pDnode, MNODE); case TDMT_DND_CREATE_QNODE: - return dndGetWrapper(pDnode, QNODE); + return dndAcquireWrapper(pDnode, QNODE); case TDMT_DND_CREATE_SNODE: - return dndGetWrapper(pDnode, SNODE); + return dndAcquireWrapper(pDnode, SNODE); case TDMT_DND_CREATE_BNODE: - return dndGetWrapper(pDnode, BNODE); + return dndAcquireWrapper(pDnode, BNODE); default: return NULL; } diff --git a/source/dnode/mgmt/container/src/dndObj.c b/source/dnode/mgmt/container/src/dndObj.c index 4cd0300e7ef802ea078d4556fd621f3ed8bfa037..e148710c1b05610009630c9cb88ad5ac6664ee5d 100644 --- a/source/dnode/mgmt/container/src/dndObj.c +++ b/source/dnode/mgmt/container/src/dndObj.c @@ -94,6 +94,7 @@ SDnode *dndCreate(const SDnodeOpt *pOption) { } pWrapper->procType = PROC_SINGLE; + taosInitRWLatch(&pWrapper->latch); } code = 0; @@ -136,3 +137,29 @@ void dndHandleEvent(SDnode *pDnode, EDndEvent event) { dInfo("dnode object receive event %d, data:%p", event, pDnode); pDnode->event = event; } + +SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType nodeType) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[nodeType]; + SMgmtWrapper *pRetWrapper = pWrapper; + + taosRLockLatch(&pWrapper->latch); + if (pWrapper->deployed) { + int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1); + dTrace("node:%s, is acquired, refCount:%d", pWrapper->name, refCount); + } else { + terrno = TSDB_CODE_NODE_NOT_DEPLOYED; + pRetWrapper = NULL; + } + taosRUnLockLatch(&pWrapper->latch); + + return pRetWrapper; +} + +void dndReleaseWrapper(SMgmtWrapper *pWrapper) { + if (pWrapper == NULL) return; + + taosRLockLatch(&pWrapper->latch); + int32_t refCount = atomic_sub_fetch_32(&pWrapper->refCount, 1); + taosRUnLockLatch(&pWrapper->latch); + dTrace("node:%s, is released, refCount:%d", pWrapper->name, refCount); +} \ No newline at end of file diff --git a/source/dnode/mgmt/container/src/dndTransport.c b/source/dnode/mgmt/container/src/dndTransport.c index bd337f4c73953c432336c4aa08c885fd4cebc835..9aa3bccb980e4333abf66a7f9e2d0f1ae20d37bd 100644 --- a/source/dnode/mgmt/container/src/dndTransport.c +++ b/source/dnode/mgmt/container/src/dndTransport.c @@ -125,7 +125,7 @@ static void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pRpcMsg, SRpcMsg *pRp STransMgmt *pMgmt = &pDnode->trans; SEpSet epSet = {0}; - dmGetMnodeEpSet(dndGetWrapper(pDnode, DNODE)->pMgmt, &epSet); + dmGetMnodeEpSet(dndAcquireWrapper(pDnode, DNODE)->pMgmt, &epSet); rpcSendRecv(pMgmt->clientRpc, &epSet, pRpcMsg, pRpcRsp); } @@ -159,7 +159,7 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char return 0; } - if (mmGetUserAuth(dndGetWrapper(pDnode, MNODE), user, spi, encrypt, secret, ckey) == 0) { + if (mmGetUserAuth(dndAcquireWrapper(pDnode, MNODE), user, spi, encrypt, secret, ckey) == 0) { dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt); return 0; } @@ -287,14 +287,14 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) { SDnode *pDnode = pWrapper->pDnode; STransMgmt *pTrans = &pDnode->trans; SEpSet epSet = {0}; - dmGetMnodeEpSet(dndGetWrapper(pDnode, DNODE)->pMgmt, &epSet); + dmGetMnodeEpSet(dndAcquireWrapper(pDnode, DNODE)->pMgmt, &epSet); return dndSendRpcReq(pTrans, &epSet, pReq); } } void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) { if (pRsp->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRsp->code == TSDB_CODE_APP_NOT_READY) { - SMgmtWrapper *pDnodeWrapper = dndGetWrapper(pWrapper->pDnode, DNODE); + SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE); dmSendRedirectRsp(pDnodeWrapper->pMgmt, pRsp); } else { rpcSendResponse(pRsp); diff --git a/source/dnode/mgmt/dnode/src/dmMsg.c b/source/dnode/mgmt/dnode/src/dmMsg.c index 8d56b48636c93a72d0cf62579ea9a77c1a8ce914..25b1fdd978bb40139f55d37cd2ebd78c4b550dbe 100644 --- a/source/dnode/mgmt/dnode/src/dmMsg.c +++ b/source/dnode/mgmt/dnode/src/dmMsg.c @@ -42,7 +42,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { taosRUnLockLatch(&pMgmt->latch); req.pVloads = taosArrayInit(TSDB_MAX_VNODES, sizeof(SVnodeLoad)); - vmGetVnodeLoads(dndGetWrapper(pDnode, VNODES), req.pVloads); + vmGetVnodeLoads(dndAcquireWrapper(pDnode, VNODES), req.pVloads); int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); void *pHead = rpcMallocCont(contLen); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 1b961cd51a2b986a2b40f18bfa5eb309ce595d56..4dee1f45720a6ccafde8fe1179a9addee1472512 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -69,6 +69,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_APP_NOT_READY, "Database not ready") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_INVALID_VERSION, "Invalid app version") TAOS_DEFINE_ERROR(TSDB_CODE_COMPRESS_ERROR, "Failed to compress msg") +TAOS_DEFINE_ERROR(TSDB_CODE_NODE_ALREADY_DEPLOYED, "Node already deployed") +TAOS_DEFINE_ERROR(TSDB_CODE_NODE_NOT_DEPLOYED, "Node not deployed") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_OPS_NOT_SUPPORT, "Operation not supported")