diff --git a/src/mnode/inc/mgmtDnode.h b/src/mnode/inc/mgmtDnode.h index 30b121c54b493a0921775ae91c1f322b629b2e7b..f262cd3c5e8d9ed95db23112e62b65d8c4b35c75 100644 --- a/src/mnode/inc/mgmtDnode.h +++ b/src/mnode/inc/mgmtDnode.h @@ -25,6 +25,8 @@ int32_t mgmtInitDnodes(); void mgmtCleanUpDnodes(); int32_t mgmtGetDnodesNum(); void * mgmtGetNextDnode(void *pNode, SDnodeObj **pDnode); +void mgmtIncDnodeRef(SDnodeObj *pDnode); +void mgmtDecDnodeRef(SDnodeObj *pDnode); SDnodeObj* mgmtGetDnode(int32_t dnodeId); SDnodeObj* mgmtGetDnodeByIp(uint32_t ip); diff --git a/src/mnode/src/mgmtBalance.c b/src/mnode/src/mgmtBalance.c index 3b8a6abc86703ffc21570f74211917d74146db90..3a7ae8dcde4f9cfecf5367fd8faddf917921339a 100644 --- a/src/mnode/src/mgmtBalance.c +++ b/src/mnode/src/mgmtBalance.c @@ -29,6 +29,7 @@ int32_t mgmtAllocVnodes(SVgObj *pVgroup) { float vnodeUsage = 1.0; while (1) { + mgmtDecDnodeRef(pDnode); pNode = mgmtGetNextDnode(pNode, &pDnode); if (pDnode == NULL) break; if (pDnode->numOfTotalVnodes <= 0) continue; diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index d5203200eeeeedf9566d480d176cf3d02554cc21..f9ce526c7499f756ce20cf44985f3ae10d20bc1a 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -42,6 +42,8 @@ extern int32_t clusterInit(); extern void clusterCleanUp(); extern int32_t clusterGetDnodesNum(); extern void * clusterGetNextDnode(void *pNode, void **pDnode); +extern void clusterIncDnodeRef(SDnodeObj *pDnode); +extern void clusterDecDnodeRef(SDnodeObj *pDnode); extern SDnodeObj* clusterGetDnode(int32_t dnodeId); extern SDnodeObj* clusterGetDnodeByIp(uint32_t ip); #ifndef _CLUSTER @@ -118,6 +120,18 @@ int32_t mgmtGetDnodesNum() { #endif } +void mgmtIncDnodeRef(SDnodeObj *pDnode) { +#ifdef _CLUSTER + return clusterIncDnodeRef(pDnode); +#endif +} + +void mgmtDecDnodeRef(SDnodeObj *pDnode) { +#ifdef _CLUSTER + return clusterDecDnodeRef(pDnode); +#endif +} + void * mgmtGetNextDnode(void *pNode, SDnodeObj **pDnode) { #ifdef _CLUSTER return clusterGetNextDnode(pNode, pDnode); @@ -183,6 +197,13 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { pStatus->numOfCores = htons(pStatus->numOfCores); pStatus->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes); + uint32_t version = htonl(pStatus->version); + if (version != tsVersion) { + mError("status msg version:%d not equal with mnode:%d", version, tsVersion); + mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_VERSION); + return ; + } + SDnodeObj *pDnode = NULL; if (pStatus->dnodeId == 0) { pDnode = mgmtGetDnodeByIp(pStatus->privateIp); @@ -199,13 +220,6 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { return; } } - - uint32_t version = htonl(pStatus->version); - if (version != tsVersion) { - mError("dnode:%d, status msg version:%d not equal with mnode:%d", pDnode->dnodeId, version, tsVersion); - mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_INVALID_MSG_VERSION); - return ; - } pDnode->privateIp = pStatus->privateIp; pDnode->publicIp = pStatus->publicIp; @@ -241,6 +255,8 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { mgmtStartBalanceTimer(200); } + mgmtDecDnodeRef(pDnode); + int32_t contLen = sizeof(SDMStatusRsp) + TSDB_MAX_VNODES * sizeof(SVnodeAccess); SDMStatusRsp *pRsp = rpcMallocCont(contLen); if (pRsp == NULL) { @@ -340,6 +356,8 @@ static int32_t mgmtGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->pNode = NULL; + mgmtDecUserRef(pUser); + return 0; } @@ -351,6 +369,7 @@ static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, voi char ipstr[32]; while (numOfRows < rows) { + mgmtDecDnodeRef(pDnode); pShow->pNode = mgmtGetNextDnode(pShow->pNode, (SDnodeObj **)&pDnode); if (pDnode == NULL) break; @@ -454,6 +473,7 @@ static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->pNode = NULL; + mgmtDecUserRef(pUser); return 0; } @@ -466,6 +486,7 @@ int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pCo char ipstr[20]; while (numOfRows < rows) { + mgmtDecDnodeRef(pDnode); pShow->pNode = mgmtGetNextDnode(pShow->pNode, (SDnodeObj **)&pDnode); if (pDnode == NULL) break; @@ -540,6 +561,7 @@ static int32_t mgmtGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->pNode = NULL; + mgmtDecUserRef(pUser); return 0; } @@ -654,6 +676,8 @@ static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo } pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + mgmtDecDnodeRef(pDnode); + mgmtDecUserRef(pUser); return 0; } diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index 448b36b22434a53ef053d5fe94728827df49fdd6..4ccc4b8a0405b6c00524d4d942dd812a477ce5b6 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -120,6 +120,7 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo pShow->numOfRows = mgmtGetMnodesNum(); pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; pShow->pNode = NULL; + mgmtDecUserRef(pUser); return 0; } @@ -167,6 +168,7 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi } pShow->numOfReads += numOfRows; + return numOfRows; } diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 9c1ba2326f0e3071104814bcda6084c96b231d48..ad12db8b11b4472fa60de2825de1eb26b0c27e67 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -58,6 +58,7 @@ static int32_t mgmtVgroupActionDestroy(SSdbOperDesc *pOper) { if (pDnode) { atomic_sub_fetch_32(&pDnode->openVnodes, 1); } + mgmtDecDnodeRef(pDnode); } tfree(pOper->pObj); @@ -96,6 +97,7 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) { pVgroup->vnodeGid[i].publicIp = pDnode->publicIp; pVgroup->vnodeGid[i].vnode = pVgroup->vgId; atomic_add_fetch_32(&pDnode->openVnodes, 1); + mgmtDecDnodeRef(pDnode); } mgmtAddVgroupIntoDb(pVgroup); @@ -295,10 +297,10 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { if (NULL == pTable || pTable->type == TSDB_SUPER_TABLE) { return TSDB_CODE_INVALID_TABLE_ID; } - + mgmtDecTableRef(pTable); pVgroup = mgmtGetVgroup(((SChildTableObj*)pTable)->vgId); if (NULL == pVgroup) return TSDB_CODE_INVALID_TABLE_ID; - mgmtDecTableRef(pTable); + mgmtDecVgroupRef(pVgroup); maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica; } else { SVgObj *pVgroup = pDb->pHead; @@ -350,6 +352,8 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { pShow->pNode = pVgroup; } + mgmtDecDbRef(pDb); + return 0; } @@ -359,6 +363,7 @@ char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) { mError("vgroup:%d, not exist in dnode:%d", pVgroup->vgId, pDnode->dnodeId); return "null"; } + mgmtDecDnodeRef(pDnode); if (pDnode->status == TSDB_DN_STATUS_OFFLINE) { return "offline"; @@ -438,6 +443,8 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo } pShow->numOfReads += numOfRows; + mgmtDecDbRef(pDb); + return numOfRows; } @@ -634,13 +641,7 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) { code = TSDB_CODE_SDB_ERROR; } - SQueuedMsg *newMsg = calloc(1, sizeof(SQueuedMsg)); - newMsg->msgType = queueMsg->msgType; - newMsg->thandle = queueMsg->thandle; - newMsg->pUser = queueMsg->pUser; - newMsg->contLen = queueMsg->contLen; - newMsg->pCont = rpcMallocCont(newMsg->contLen); - memcpy(newMsg->pCont, queueMsg->pCont, newMsg->contLen); + SQueuedMsg *newMsg = mgmtCloneQueuedMsg(queueMsg); mgmtAddToShellQueue(newMsg); queueMsg->pCont = NULL; @@ -660,6 +661,7 @@ static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) { mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_VNODE); return; } + mgmtDecDnodeRef(pDnode); SVgObj *pVgroup = mgmtGetVgroup(pCfg->vgId); if (pVgroup == NULL) { @@ -667,6 +669,7 @@ static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) { mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_VNODE); return; } + mgmtDecVgroupRef(pVgroup); mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SUCCESS); @@ -682,6 +685,7 @@ void mgmtDropAllVgroups(SDbObj *pDropDb) { SVgObj *pVgroup = NULL; while (1) { + mgmtDecVgroupRef(pVgroup); pNode = sdbFetchRow(tsVgroupSdb, pNode, (void **)&pVgroup); if (pVgroup == NULL) break;