diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 7cf092b14417186d8e36f23d6d870809779a2638..eaac319141521a7e80f2f64a02e5580f7375bd74 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -71,6 +71,7 @@ void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg); void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type); void tmsgReportStartup(const char* name, const char* desc); void tmsgUpdateDnodeInfo(int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port); +void tmsgUpdateDnodeEpSet(SEpSet* epset); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 8ecce20f537d429775b3f827e08019f965c4ba39..784bb3c5e132cc5d012521ced390742009ad3262 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -100,6 +100,7 @@ typedef struct { bool stopped; SEpSet mnodeEps; SArray *dnodeEps; + SArray *oldDnodeEps; SHashObj *dnodeHash; TdThreadRwlock lock; SMsgCb msgCb; diff --git a/source/dnode/mgmt/node_util/src/dmEps.c b/source/dnode/mgmt/node_util/src/dmEps.c index 9640c0b5c95710e5479e7842769a1d6bc57eb5a9..4285eb5c0786f75c25850e5c18c2689ab17869b5 100644 --- a/source/dnode/mgmt/node_util/src/dmEps.c +++ b/source/dnode/mgmt/node_util/src/dmEps.c @@ -332,40 +332,48 @@ void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) { } } -void dmUpdateDnodeInfo(void *data, int32_t *dnodeId, int64_t *clusterId, char *fqdn, uint16_t *port) { +void dmUpdateDnodeInfo(void *data, int32_t *did, int64_t *clusterId, char *fqdn, uint16_t *port) { SDnodeData *pData = data; - int32_t ret = -1; + int32_t dnodeId = -1; + if (did != NULL) dnodeId = *did; + taosThreadRwlockRdlock(&pData->lock); - if (*dnodeId <= 0) { - for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pData->dnodeEps); ++i) { + + if (pData->oldDnodeEps != NULL) { + int32_t size = (int32_t)taosArrayGetSize(pData->oldDnodeEps); + for (int32_t i = 0; i < size; ++i) { + SDnodeEp *pDnodeEp = taosArrayGet(pData->oldDnodeEps, i); + if (strcmp(pDnodeEp->ep.fqdn, fqdn) == 0 && pDnodeEp->ep.port == *port) { + dInfo("dnode:%d, update ep:%s:%u to %s:%u", dnodeId, fqdn, *port, pDnodeEp->ep.fqdn, pDnodeEp->ep.port); + tstrncpy(fqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN); + *port = pDnodeEp->ep.port; + } + } + } + + if (did != NULL && dnodeId <= 0) { + int32_t size = (int32_t)taosArrayGetSize(pData->dnodeEps); + for (int32_t i = 0; i < size; ++i) { SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, i); if (strcmp(pDnodeEp->ep.fqdn, fqdn) == 0 && pDnodeEp->ep.port == *port) { - dInfo("dnode:%s:%u, update dnodeId from %d to %d", fqdn, *port, *dnodeId, pDnodeEp->id); - *dnodeId = pDnodeEp->id; + dInfo("dnode:%s:%u, update dnodeId to dnode:%d", fqdn, *port, pDnodeEp->id); + *did = pDnodeEp->id; if (clusterId != NULL) *clusterId = pData->clusterId; - ret = 0; } } - if (ret != 0) { - dInfo("dnode:%s:%u, failed to update dnodeId:%d", fqdn, *port, *dnodeId); - } - } else { - SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, dnodeId, sizeof(int32_t)); + } + + if (dnodeId > 0) { + SDnodeEp *pDnodeEp = taosHashGet(pData->dnodeHash, &dnodeId, sizeof(int32_t)); if (pDnodeEp) { - if (strcmp(pDnodeEp->ep.fqdn, fqdn) != 0) { - dInfo("dnode:%d, update port from %s to %s", *dnodeId, fqdn, pDnodeEp->ep.fqdn); + if (strcmp(pDnodeEp->ep.fqdn, fqdn) != 0 || pDnodeEp->ep.port != *port) { + dInfo("dnode:%d, update ep:%s:%u to %s:%u", dnodeId, fqdn, *port, pDnodeEp->ep.fqdn, pDnodeEp->ep.port); tstrncpy(fqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN); - } - if (pDnodeEp->ep.port != *port) { - dInfo("dnode:%d, update port from %u to %u", *dnodeId, *port, pDnodeEp->ep.port); *port = pDnodeEp->ep.port; } if (clusterId != NULL) *clusterId = pData->clusterId; - ret = 0; - } else { - dInfo("dnode:%d, failed to update dnode info", *dnodeId); } } + taosThreadRwlockUnlock(&pData->lock); - // return ret; } \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 37e2c3522512de3e4b8bb82741df50e09d8c598b..3bbf4a4279ea5f90829aea0d5bb96a04366e3cb4 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -742,6 +742,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) { goto CM_DECODE_OVER; } + tmsgUpdateDnodeEpSet(&pConsumer->ep); terrno = TSDB_CODE_SUCCESS; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index ddb54a95ead10ec1e129cd4337a53335d8cf8da1..f4e6aad7a72f6514183b5e0bb83d77ee831a5ee3 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -180,6 +180,7 @@ static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) { SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER) terrno = 0; + tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port); _OVER: if (terrno != 0) { diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index b8ef185199c5b93c57ab4d2e68d30bb12c797373..153bb8bd04b73bf3c025851a0f2d00b1f6908eac 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -760,6 +760,27 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { goto SUB_DECODE_OVER; } + // update epset saved in mnode + if (pSub->unassignedVgs != NULL) { + int32_t size = (int32_t)taosArrayGetSize(pSub->unassignedVgs); + for (int32_t i = 0; i < size; ++i) { + SMqVgEp *pMqVgEp = taosArrayGet(pSub->unassignedVgs, i); + tmsgUpdateDnodeEpSet(&pMqVgEp->epSet); + } + } + if (pSub->consumerHash != NULL) { + void *pIter = taosHashIterate(pSub->consumerHash, NULL); + while (pIter) { + SMqConsumerEp *pConsumerEp = pIter; + int32_t size = (int32_t)taosArrayGetSize(pConsumerEp->vgs); + for (int32_t i = 0; i < size; ++i) { + SMqVgEp *pMqVgEp = taosArrayGet(pConsumerEp->vgs, i); + tmsgUpdateDnodeEpSet(&pMqVgEp->epSet); + } + pIter = taosHashIterate(pSub->consumerHash, pIter); + } + } + terrno = TSDB_CODE_SUCCESS; SUB_DECODE_OVER: diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 718fc5c73f27e96fb261effbdc54c73310a3e924..dfcd55bcba7ca6a46a79cac2351df18a135218db 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -329,6 +329,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { action.pRaw = NULL; } else if (action.actionType == TRANS_ACTION_MSG) { SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); + tmsgUpdateDnodeEpSet(&action.epSet); SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgSent*/, _OVER) SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgReceived*/, _OVER) diff --git a/source/libs/transport/src/tmsgcb.c b/source/libs/transport/src/tmsgcb.c index eea0d876840a7fe44d6a14ecb6723061b3d2ee75..af2528bc9247adab655227bc8761f66ed1678491 100644 --- a/source/libs/transport/src/tmsgcb.c +++ b/source/libs/transport/src/tmsgcb.c @@ -62,3 +62,9 @@ void tmsgReportStartup(const char* name, const char* desc) { (*defaultMsgCb.repo void tmsgUpdateDnodeInfo(int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port) { (*defaultMsgCb.updateDnodeInfoFp)(defaultMsgCb.data, dnodeId, clusterId, fqdn, port); } + +void tmsgUpdateDnodeEpSet(SEpSet* epset) { + for (int32_t i = 0; i < epset->numOfEps; ++i) { + tmsgUpdateDnodeInfo(NULL, NULL, epset->eps[i].fqdn, &epset->eps[i].port); + } +} \ No newline at end of file