diff --git a/src/mnode/inc/mgmtMnode.h b/src/mnode/inc/mgmtMnode.h index b9a135346dc971205f97900149d6839f879a61cf..0973aa6ea67985e5736c1b90f3d7c2ba1286e642 100644 --- a/src/mnode/inc/mgmtMnode.h +++ b/src/mnode/inc/mgmtMnode.h @@ -44,7 +44,7 @@ void mgmtDecMnodeRef(struct SMnodeObj *pMnode); char * mgmtGetMnodeRoleStr(); void mgmtGetMnodeIpSet(SRpcIpSet *ipSet); void mgmtGetMnodeInfos(void *mnodes); - +void mgmtUpdateMnodeIpSet(); #ifdef __cplusplus } diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index d9ddd465f6d7ac1e60db572e2839905a498bac03..0bb4895e36dfb75a2249bac94f5967894e288586 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -36,6 +36,25 @@ static int32_t tsMnodeUpdateSize = 0; static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); +static SRpcIpSet tsMnodeRpcIpSet; +static SDMMnodeInfos tsMnodeInfos; + +#if defined(LINUX) + static pthread_rwlock_t tsMnodeLock; + #define mgmtMnodeWrLock() pthread_rwlock_wrlock(&tsMnodeLock) + #define mgmtMnodeRdLock() pthread_rwlock_rdlock(&tsMnodeLock) + #define mgmtMnodeUnLock() pthread_rwlock_unlock(&tsMnodeLock) + #define mgmtMnodeInitLock() pthread_rwlock_init(&tsMnodeLock, NULL) + #define mgmtMnodeDestroyLock() pthread_rwlock_destroy(&tsMnodeLock) +#else + static pthread_mutex_t tsMnodeLock; + #define mgmtMnodeWrLock() pthread_mutex_lock(&tsMnodeLock) + #define mgmtMnodeRdLock() pthread_mutex_lock(&tsMnodeLock) + #define mgmtMnodeUnLock() pthread_mutex_unlock(&tsMnodeLock) + #define mgmtMnodeInitLock() pthread_mutex_init(&tsMnodeLock, NULL) + #define mgmtMnodeDestroyLock() pthread_mutex_destroy(&tsMnodeLock) +#endif + static int32_t mgmtMnodeActionDestroy(SSdbOper *pOper) { tfree(pOper->pObj); return TSDB_CODE_SUCCESS; @@ -106,6 +125,8 @@ static int32_t mgmtMnodeActionRestored() { } int32_t mgmtInitMnodes() { + mgmtMnodeInitLock(); + SMnodeObj tObj; tsMnodeUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj; @@ -140,6 +161,7 @@ int32_t mgmtInitMnodes() { void mgmtCleanupMnodes() { sdbCloseTable(tsMnodeSdb); + mgmtMnodeDestroyLock(); } int32_t mgmtGetMnodesNum() { @@ -177,50 +199,52 @@ char *mgmtGetMnodeRoleStr(int32_t role) { } } -void mgmtGetMnodeIpSet(SRpcIpSet *ipSet) { - void *pIter = NULL; - while (1) { - SMnodeObj *pMnode = NULL; - pIter = mgmtGetNextMnode(pIter, &pMnode); - if (pMnode == NULL) break; - - strcpy(ipSet->fqdn[ipSet->numOfIps], pMnode->pDnode->dnodeFqdn); - ipSet->port[ipSet->numOfIps] = htons(pMnode->pDnode->dnodePort); - - if (pMnode->role == TAOS_SYNC_ROLE_MASTER) { - ipSet->inUse = ipSet->numOfIps; - } +void mgmtUpdateMnodeIpSet() { + SRpcIpSet *ipSet = &tsMnodeRpcIpSet; + SDMMnodeInfos *mnodes = &tsMnodeInfos; - ipSet->numOfIps++; - - mgmtDecMnodeRef(pMnode); - } - sdbFreeIter(pIter); -} + mgmtMnodeWrLock(); -void mgmtGetMnodeInfos(void *param) { - SDMMnodeInfos *mnodes = param; - mnodes->inUse = 0; - int32_t index = 0; - void *pIter = NULL; + void * pIter = NULL; while (1) { SMnodeObj *pMnode = NULL; pIter = mgmtGetNextMnode(pIter, &pMnode); if (pMnode == NULL) break; + strcpy(ipSet->fqdn[ipSet->numOfIps], pMnode->pDnode->dnodeFqdn); + ipSet->port[ipSet->numOfIps] = htons(pMnode->pDnode->dnodePort); + mnodes->nodeInfos[index].nodeId = htonl(pMnode->mnodeId); strcpy(mnodes->nodeInfos[index].nodeEp, pMnode->pDnode->dnodeEp); + if (pMnode->role == TAOS_SYNC_ROLE_MASTER) { + ipSet->inUse = ipSet->numOfIps; mnodes->inUse = index; } + ipSet->numOfIps++; index++; + mgmtDecMnodeRef(pMnode); } - sdbFreeIter(pIter); mnodes->nodeNum = index; + sdbFreeIter(pIter); + + mgmtMnodeUnLock(); +} + +void mgmtGetMnodeIpSet(SRpcIpSet *ipSet) { + mgmtMnodeRdLock(); + *ipSet = tsMnodeRpcIpSet; + mgmtMnodeUnLock(); +} + +void mgmtGetMnodeInfos(void *mnodeInfos) { + mgmtMnodeRdLock(); + *(SDMMnodeInfos *)mnodeInfos = tsMnodeInfos; + mgmtMnodeUnLock(); } int32_t mgmtAddMnode(int32_t dnodeId) { @@ -240,6 +264,8 @@ int32_t mgmtAddMnode(int32_t dnodeId) { code = TSDB_CODE_SDB_ERROR; } + mgmtUpdateMnodeIpSet(); + return code; } @@ -250,6 +276,8 @@ void mgmtDropMnodeLocal(int32_t dnodeId) { sdbDeleteRow(&oper); mgmtDecMnodeRef(pMnode); } + + mgmtUpdateMnodeIpSet(); } int32_t mgmtDropMnode(int32_t dnodeId) { @@ -270,6 +298,9 @@ int32_t mgmtDropMnode(int32_t dnodeId) { } sdbDecRef(tsMnodeSdb, pMnode); + + mgmtUpdateMnodeIpSet(); + return code; } diff --git a/src/mnode/src/mgmtSdb.c b/src/mnode/src/mgmtSdb.c index 1e936d15420871bc5cd0806caae74def0f4753e1..d72eacd4649f4f53ef8300c6ffbeb8695bbe4194 100644 --- a/src/mnode/src/mgmtSdb.c +++ b/src/mnode/src/mgmtSdb.c @@ -196,6 +196,8 @@ void sdbUpdateMnodeRoles() { mgmtDecMnodeRef(pMnode); } } + + mgmtUpdateMnodeIpSet(); } static uint32_t sdbGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size, uint64_t *fversion) {