From d476b6db13deae5cd81043e3d4316bb240a60486 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 16 Jan 2023 15:41:50 +0800 Subject: [PATCH] enh: update epset on dnode info changed --- include/common/tmsgcb.h | 4 +- source/dnode/mgmt/node_mgmt/src/dmMgmt.c | 8 +-- source/dnode/mgmt/node_util/inc/dmUtil.h | 2 +- source/dnode/mgmt/node_util/src/dmEps.c | 64 +++++++++++++----------- source/dnode/mnode/impl/src/mndDnode.c | 6 ++- source/dnode/mnode/impl/src/mndMnode.c | 2 +- source/dnode/mnode/impl/src/mndSync.c | 2 +- source/dnode/vnode/src/vnd/vnodeOpen.c | 17 ++++++- source/libs/sync/src/syncMain.c | 13 ++++- source/libs/transport/src/tmsgcb.c | 4 +- 10 files changed, 77 insertions(+), 45 deletions(-) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index eaac319141..eca8740d28 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -39,7 +39,7 @@ typedef enum { QUEUE_MAX, } EQueueType; -typedef void (*UpdateDnodeInfoFp)(void* pData, int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port); +typedef bool (*UpdateDnodeInfoFp)(void* pData, int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port); typedef int32_t (*PutToQueueFp)(void* pMgmt, EQueueType qtype, SRpcMsg* pMsg); typedef int32_t (*GetQueueSizeFp)(void* pMgmt, int32_t vgId, EQueueType qtype); typedef int32_t (*SendReqFp)(const SEpSet* pEpSet, SRpcMsg* pMsg); @@ -70,7 +70,7 @@ void tmsgSendRsp(SRpcMsg* pMsg); void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg); void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type); void tmsgReportStartup(const char* name, const char* desc); -void tmsgUpdateDnodeInfo(int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port); +bool tmsgUpdateDnodeInfo(int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port); void tmsgUpdateDnodeEpSet(SEpSet* epset); #ifdef __cplusplus diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index 16d453abb5..3dd8a19d92 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -79,10 +79,6 @@ static void dmClearVars(SDnode *pDnode) { SDnodeData *pData = &pDnode->data; taosThreadRwlockWrlock(&pData->lock); - if (pData->dnodeEps != NULL) { - taosArrayDestroy(pData->dnodeEps); - pData->dnodeEps = NULL; - } if (pData->oldDnodeEps != NULL) { if (dmWriteEps(pData) == 0) { dmRemoveDnodePairs(pData); @@ -90,6 +86,10 @@ static void dmClearVars(SDnode *pDnode) { taosArrayDestroy(pData->oldDnodeEps); pData->oldDnodeEps = NULL; } + if (pData->dnodeEps != NULL) { + taosArrayDestroy(pData->dnodeEps); + pData->dnodeEps = NULL; + } if (pData->dnodeHash != NULL) { taosHashCleanup(pData->dnodeHash); pData->dnodeHash = NULL; diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 40b171601d..c2f403dfbb 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -168,7 +168,7 @@ void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps); void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet); void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet); void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet); -void dmUpdateDnodeInfo(void *pData, int32_t *dnodeId, int64_t *clusterId, char *fqdn, uint16_t *port); +bool dmUpdateDnodeInfo(void *pData, int32_t *dnodeId, int64_t *clusterId, char *fqdn, uint16_t *port); void dmRemoveDnodePairs(SDnodeData *pData); #ifdef __cplusplus diff --git a/source/dnode/mgmt/node_util/src/dmEps.c b/source/dnode/mgmt/node_util/src/dmEps.c index 8234787baa..0e8a1a0748 100644 --- a/source/dnode/mgmt/node_util/src/dmEps.c +++ b/source/dnode/mgmt/node_util/src/dmEps.c @@ -146,7 +146,7 @@ int32_t dmReadEps(SDnodeData *pData) { } code = 0; - dInfo("succceed to read mnode file %s", file); + dInfo("succceed to read dnode file %s", file); _OVER: if (content != NULL) taosMemoryFree(content); @@ -172,7 +172,7 @@ _OVER: dDebug("reset dnode list on startup"); dmResetEps(pData, pData->dnodeEps); - if (dmIsEpChanged(pData, pData->dnodeId, tsLocalEp)) { + if (pData->dnodeEps == NULL && dmIsEpChanged(pData, pData->dnodeId, tsLocalEp)) { dError("localEp %s different with %s and need reconfigured", tsLocalEp, file); return -1; } @@ -236,7 +236,8 @@ int32_t dmWriteEps(SDnodeData *pData) { code = 0; pData->updateTime = taosGetTimestampMs(); - dInfo("succeed to write dnode file:%s, dnodeVer:%" PRId64, realfile, pData->dnodeVer); + dInfo("succeed to write dnode file:%s, num:%d ver:%" PRId64, realfile, (int32_t)taosArrayGetSize(pData->dnodeEps), + pData->dnodeVer); _OVER: if (pJson != NULL) tjsonDelete(pJson); @@ -346,7 +347,8 @@ void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) { } } -void dmUpdateDnodeInfo(void *data, int32_t *did, int64_t *clusterId, char *fqdn, uint16_t *port) { +bool dmUpdateDnodeInfo(void *data, int32_t *did, int64_t *clusterId, char *fqdn, uint16_t *port) { + bool updated = false; SDnodeData *pData = data; int32_t dnodeId = -1; if (did != NULL) dnodeId = *did; @@ -361,6 +363,7 @@ void dmUpdateDnodeInfo(void *data, int32_t *did, int64_t *clusterId, char *fqdn, dInfo("dnode:%d, update ep:%s:%u to %s:%u", dnodeId, fqdn, *port, pair->newFqdn, pair->newPort); tstrncpy(fqdn, pair->newFqdn, TSDB_FQDN_LEN); *port = pair->newPort; + updated = true; } } } @@ -384,12 +387,14 @@ void dmUpdateDnodeInfo(void *data, int32_t *did, int64_t *clusterId, char *fqdn, dInfo("dnode:%d, update ep:%s:%u to %s:%u", dnodeId, fqdn, *port, pDnodeEp->ep.fqdn, pDnodeEp->ep.port); tstrncpy(fqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN); *port = pDnodeEp->ep.port; + updated = true; } if (clusterId != NULL) *clusterId = pData->clusterId; } } taosThreadRwlockUnlock(&pData->lock); + return updated; } static int32_t dmDecodeEpPairs(SJson *pJson, SDnodeData *pData) { @@ -476,7 +481,7 @@ static int32_t dmReadDnodePairs(SDnodeData *pData) { goto _OVER; } - pData->oldDnodeEps = taosArrayInit(1, sizeof(SDnodeEp)); + pData->oldDnodeEps = taosArrayInit(1, sizeof(SDnodeEpPair)); if (pData->oldDnodeEps == NULL) { dError("failed to calloc dnodeEp array since %s", strerror(errno)); goto _OVER; @@ -490,7 +495,7 @@ static int32_t dmReadDnodePairs(SDnodeData *pData) { } code = 0; - dInfo("succceed to read mnode file %s", file); + dInfo("succceed to read dnode file %s", file); _OVER: if (content != NULL) taosMemoryFree(content); @@ -505,40 +510,39 @@ _OVER: SDnodeEpPair *pair = taosArrayGet(pData->oldDnodeEps, i); for (int32_t j = 0; j < (int32_t)taosArrayGetSize(pData->dnodeEps); ++j) { SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, j); - if (strcmp(pDnodeEp->ep.fqdn, pair->oldFqdn) == 0 && pDnodeEp->ep.port == pair->oldPort) { - dInfo("dnode:%d, will update ep:%s:%u to %s:%u in array", pDnodeEp->id, pDnodeEp->ep.fqdn, pDnodeEp->ep.port, - pair->newFqdn, pair->newPort); + if (pDnodeEp->id != pair->id && + (strcmp(pDnodeEp->ep.fqdn, pair->newFqdn) == 0 && pDnodeEp->ep.port == pair->newPort)) { + dError("dnode:%d, can't update ep:%s:%u to %s:%u since already exists as dnode:%d", pair->id, pair->oldFqdn, + pair->oldPort, pair->newFqdn, pair->newPort, pDnodeEp->id); tstrncpy(pDnodeEp->ep.fqdn, pair->newFqdn, TSDB_FQDN_LEN); pDnodeEp->ep.port = pair->newPort; } - } - void *pIter = taosHashIterate(pData->dnodeHash, NULL); - while (pIter) { - SDnodeEp *pDnodeEp = pIter; - if (strcmp(pDnodeEp->ep.fqdn, pair->oldFqdn) == 0 && pDnodeEp->ep.port == pair->oldPort) { - dDebug("dnode:%d, will update ep:%s:%u to %s:%u in hash", pDnodeEp->id, pDnodeEp->ep.fqdn, pDnodeEp->ep.port, - pair->newFqdn, pair->newPort); + +#if 0 + if (pDnodeEp->id == pair->id && + (strcmp(pDnodeEp->ep.fqdn, pair->oldFqdn) == 0 && pDnodeEp->ep.port == pair->oldPort)) { + dError("dnode:%d, can't update ep:%s:%u to %s:%u since endpoint not matched", pair->id, pair->oldFqdn, + pair->oldPort, pair->newFqdn, pair->newPort, pDnodeEp->id); tstrncpy(pDnodeEp->ep.fqdn, pair->newFqdn, TSDB_FQDN_LEN); pDnodeEp->ep.port = pair->newPort; } - pIter = taosHashIterate(pData->dnodeHash, pIter); +#endif } } - if (taosArrayGetSize(pData->dnodeEps) == 0) { - SDnodeEp dnodeEp = {0}; - dnodeEp.isMnode = 1; - taosGetFqdnPortFromEp(tsFirst, &dnodeEp.ep); - taosArrayPush(pData->dnodeEps, &dnodeEp); - } - - dDebug("reset dnode list on startup"); - dmResetEps(pData, pData->dnodeEps); - - if (dmIsEpChanged(pData, pData->dnodeId, tsLocalEp)) { - dError("localEp %s different with %s and need reconfigured", tsLocalEp, file); - return -1; + for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pData->oldDnodeEps); ++i) { + SDnodeEpPair *pair = taosArrayGet(pData->oldDnodeEps, i); + for (int32_t j = 0; j < (int32_t)taosArrayGetSize(pData->dnodeEps); ++j) { + SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, j); + if (strcmp(pDnodeEp->ep.fqdn, pair->oldFqdn) == 0 && pDnodeEp->ep.port == pair->oldPort) { + dInfo("dnode:%d, will update ep:%s:%u to %s:%u", pDnodeEp->id, pDnodeEp->ep.fqdn, pDnodeEp->ep.port, + pair->newFqdn, pair->newPort); + tstrncpy(pDnodeEp->ep.fqdn, pair->newFqdn, TSDB_FQDN_LEN); + pDnodeEp->ep.port = pair->newPort; + } + } } + pData->dnodeVer = 0; return code; } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index f4e6aad7a7..97490beb3c 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -180,7 +180,9 @@ static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) { SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER) terrno = 0; - tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port); + if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) { + mInfo("dnode:%d, endpoint changed", pDnode->id); + } _OVER: if (terrno != 0) { @@ -189,7 +191,7 @@ _OVER: return NULL; } - mTrace("dnode:%d, decode from raw:%p, row:%p", pDnode->id, pRaw, pDnode); + mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port); return pRow; } diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 7dcd287fb7..add32fd335 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -747,7 +747,7 @@ static void mndReloadSyncConfig(SMnode *pMnode) { pNode->clusterId = mndGetClusterId(pMnode); pNode->nodePort = pObj->pDnode->port; tstrncpy(pNode->nodeFqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN); - tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); + (void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); mInfo("vgId:1, ep:%s:%u dnode:%d", pNode->nodeFqdn, pNode->nodePort, pNode->nodeId); if (pObj->pDnode->id == pMnode->selfDnodeId) { cfg.myIndex = cfg.replicaNum; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 6b675586e4..7dc0912403 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -303,7 +303,7 @@ int32_t mndInitSync(SMnode *pMnode) { pNode->nodeId = pMgmt->replicas[i].id; pNode->nodePort = pMgmt->replicas[i].port; tstrncpy(pNode->nodeFqdn, pMgmt->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); - tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); + (void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); mInfo("vgId:1, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId, pNode->clusterId); } diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 743ae17f2b..61cb75b1da 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -86,7 +86,7 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) { pNode->nodeId = pReq->replicas[i].id; pNode->nodePort = pReq->replicas[i].port; tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn)); - tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); + (void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); vInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId); } @@ -134,6 +134,21 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { return NULL; } + // save vnode info on dnode ep changed + bool updated = false; + SSyncCfg *pCfg = &info.config.syncCfg; + for (int32_t i = 0; i < pCfg->replicaNum; ++i) { + SNodeInfo *pNode = &pCfg->nodeInfo[i]; + if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) { + updated = true; + } + } + if (updated) { + vInfo("vgId:%d, save vnode info since dnode info changed", info.config.vgId); + (void)vnodeSaveInfo(dir, &info); + (void)vnodeCommitInfo(dir, &info); + } + // create handle pVnode = taosMemoryCalloc(1, sizeof(*pVnode) + strlen(path) + 1); if (pVnode == NULL) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 02f9795cad..ac377911eb 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -895,14 +895,25 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { // init by SSyncInfo pSyncNode->vgId = pSyncInfo->vgId; SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg; + bool updated = false; sInfo("vgId:%d, start to open sync node, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex); for (int32_t i = 0; i < pCfg->replicaNum; ++i) { SNodeInfo* pNode = &pCfg->nodeInfo[i]; - tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort); + if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) { + updated = true; + } sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId, pNode->clusterId); } + if (updated) { + sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId); + if (syncWriteCfgFile(pSyncNode) != 0) { + sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId); + goto _error; + } + } + pSyncNode->pWal = pSyncInfo->pWal; pSyncNode->msgcb = pSyncInfo->msgcb; pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg; diff --git a/source/libs/transport/src/tmsgcb.c b/source/libs/transport/src/tmsgcb.c index af2528bc92..9b8f1dfd07 100644 --- a/source/libs/transport/src/tmsgcb.c +++ b/source/libs/transport/src/tmsgcb.c @@ -59,8 +59,8 @@ void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type) { (*defaultMsgCb.re void tmsgReportStartup(const char* name, const char* desc) { (*defaultMsgCb.reportStartupFp)(name, desc); } -void tmsgUpdateDnodeInfo(int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port) { - (*defaultMsgCb.updateDnodeInfoFp)(defaultMsgCb.data, dnodeId, clusterId, fqdn, port); +bool tmsgUpdateDnodeInfo(int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port) { + return (*defaultMsgCb.updateDnodeInfoFp)(defaultMsgCb.data, dnodeId, clusterId, fqdn, port); } void tmsgUpdateDnodeEpSet(SEpSet* epset) { -- GitLab