提交 d476b6db 编写于 作者: S Shengliang Guan

enh: update epset on dnode info changed

上级 da6722d3
......@@ -39,7 +39,7 @@ typedef enum {
QUEUE_MAX,
} EQueueType;
typedef void (*UpdateDnodeInfoFp)(void* pData, int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port);
typedef bool (*UpdateDnodeInfoFp)(void* pData, int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port);
typedef int32_t (*PutToQueueFp)(void* pMgmt, EQueueType qtype, SRpcMsg* pMsg);
typedef int32_t (*GetQueueSizeFp)(void* pMgmt, int32_t vgId, EQueueType qtype);
typedef int32_t (*SendReqFp)(const SEpSet* pEpSet, SRpcMsg* pMsg);
......@@ -70,7 +70,7 @@ void tmsgSendRsp(SRpcMsg* pMsg);
void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg);
void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type);
void tmsgReportStartup(const char* name, const char* desc);
void tmsgUpdateDnodeInfo(int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port);
bool tmsgUpdateDnodeInfo(int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port);
void tmsgUpdateDnodeEpSet(SEpSet* epset);
#ifdef __cplusplus
......
......@@ -79,10 +79,6 @@ static void dmClearVars(SDnode *pDnode) {
SDnodeData *pData = &pDnode->data;
taosThreadRwlockWrlock(&pData->lock);
if (pData->dnodeEps != NULL) {
taosArrayDestroy(pData->dnodeEps);
pData->dnodeEps = NULL;
}
if (pData->oldDnodeEps != NULL) {
if (dmWriteEps(pData) == 0) {
dmRemoveDnodePairs(pData);
......@@ -90,6 +86,10 @@ static void dmClearVars(SDnode *pDnode) {
taosArrayDestroy(pData->oldDnodeEps);
pData->oldDnodeEps = NULL;
}
if (pData->dnodeEps != NULL) {
taosArrayDestroy(pData->dnodeEps);
pData->dnodeEps = NULL;
}
if (pData->dnodeHash != NULL) {
taosHashCleanup(pData->dnodeHash);
pData->dnodeHash = NULL;
......
......@@ -168,7 +168,7 @@ void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps);
void dmGetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet);
void dmGetMnodeEpSetForRedirect(SDnodeData *pData, SRpcMsg *pMsg, SEpSet *pEpSet);
void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet);
void dmUpdateDnodeInfo(void *pData, int32_t *dnodeId, int64_t *clusterId, char *fqdn, uint16_t *port);
bool dmUpdateDnodeInfo(void *pData, int32_t *dnodeId, int64_t *clusterId, char *fqdn, uint16_t *port);
void dmRemoveDnodePairs(SDnodeData *pData);
#ifdef __cplusplus
......
......@@ -146,7 +146,7 @@ int32_t dmReadEps(SDnodeData *pData) {
}
code = 0;
dInfo("succceed to read mnode file %s", file);
dInfo("succceed to read dnode file %s", file);
_OVER:
if (content != NULL) taosMemoryFree(content);
......@@ -172,7 +172,7 @@ _OVER:
dDebug("reset dnode list on startup");
dmResetEps(pData, pData->dnodeEps);
if (dmIsEpChanged(pData, pData->dnodeId, tsLocalEp)) {
if (pData->dnodeEps == NULL && dmIsEpChanged(pData, pData->dnodeId, tsLocalEp)) {
dError("localEp %s different with %s and need reconfigured", tsLocalEp, file);
return -1;
}
......@@ -236,7 +236,8 @@ int32_t dmWriteEps(SDnodeData *pData) {
code = 0;
pData->updateTime = taosGetTimestampMs();
dInfo("succeed to write dnode file:%s, dnodeVer:%" PRId64, realfile, pData->dnodeVer);
dInfo("succeed to write dnode file:%s, num:%d ver:%" PRId64, realfile, (int32_t)taosArrayGetSize(pData->dnodeEps),
pData->dnodeVer);
_OVER:
if (pJson != NULL) tjsonDelete(pJson);
......@@ -346,7 +347,8 @@ void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
}
}
void dmUpdateDnodeInfo(void *data, int32_t *did, int64_t *clusterId, char *fqdn, uint16_t *port) {
bool dmUpdateDnodeInfo(void *data, int32_t *did, int64_t *clusterId, char *fqdn, uint16_t *port) {
bool updated = false;
SDnodeData *pData = data;
int32_t dnodeId = -1;
if (did != NULL) dnodeId = *did;
......@@ -361,6 +363,7 @@ void dmUpdateDnodeInfo(void *data, int32_t *did, int64_t *clusterId, char *fqdn,
dInfo("dnode:%d, update ep:%s:%u to %s:%u", dnodeId, fqdn, *port, pair->newFqdn, pair->newPort);
tstrncpy(fqdn, pair->newFqdn, TSDB_FQDN_LEN);
*port = pair->newPort;
updated = true;
}
}
}
......@@ -384,12 +387,14 @@ void dmUpdateDnodeInfo(void *data, int32_t *did, int64_t *clusterId, char *fqdn,
dInfo("dnode:%d, update ep:%s:%u to %s:%u", dnodeId, fqdn, *port, pDnodeEp->ep.fqdn, pDnodeEp->ep.port);
tstrncpy(fqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN);
*port = pDnodeEp->ep.port;
updated = true;
}
if (clusterId != NULL) *clusterId = pData->clusterId;
}
}
taosThreadRwlockUnlock(&pData->lock);
return updated;
}
static int32_t dmDecodeEpPairs(SJson *pJson, SDnodeData *pData) {
......@@ -476,7 +481,7 @@ static int32_t dmReadDnodePairs(SDnodeData *pData) {
goto _OVER;
}
pData->oldDnodeEps = taosArrayInit(1, sizeof(SDnodeEp));
pData->oldDnodeEps = taosArrayInit(1, sizeof(SDnodeEpPair));
if (pData->oldDnodeEps == NULL) {
dError("failed to calloc dnodeEp array since %s", strerror(errno));
goto _OVER;
......@@ -490,7 +495,7 @@ static int32_t dmReadDnodePairs(SDnodeData *pData) {
}
code = 0;
dInfo("succceed to read mnode file %s", file);
dInfo("succceed to read dnode file %s", file);
_OVER:
if (content != NULL) taosMemoryFree(content);
......@@ -505,40 +510,39 @@ _OVER:
SDnodeEpPair *pair = taosArrayGet(pData->oldDnodeEps, i);
for (int32_t j = 0; j < (int32_t)taosArrayGetSize(pData->dnodeEps); ++j) {
SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, j);
if (strcmp(pDnodeEp->ep.fqdn, pair->oldFqdn) == 0 && pDnodeEp->ep.port == pair->oldPort) {
dInfo("dnode:%d, will update ep:%s:%u to %s:%u in array", pDnodeEp->id, pDnodeEp->ep.fqdn, pDnodeEp->ep.port,
pair->newFqdn, pair->newPort);
if (pDnodeEp->id != pair->id &&
(strcmp(pDnodeEp->ep.fqdn, pair->newFqdn) == 0 && pDnodeEp->ep.port == pair->newPort)) {
dError("dnode:%d, can't update ep:%s:%u to %s:%u since already exists as dnode:%d", pair->id, pair->oldFqdn,
pair->oldPort, pair->newFqdn, pair->newPort, pDnodeEp->id);
tstrncpy(pDnodeEp->ep.fqdn, pair->newFqdn, TSDB_FQDN_LEN);
pDnodeEp->ep.port = pair->newPort;
}
}
void *pIter = taosHashIterate(pData->dnodeHash, NULL);
while (pIter) {
SDnodeEp *pDnodeEp = pIter;
if (strcmp(pDnodeEp->ep.fqdn, pair->oldFqdn) == 0 && pDnodeEp->ep.port == pair->oldPort) {
dDebug("dnode:%d, will update ep:%s:%u to %s:%u in hash", pDnodeEp->id, pDnodeEp->ep.fqdn, pDnodeEp->ep.port,
pair->newFqdn, pair->newPort);
#if 0
if (pDnodeEp->id == pair->id &&
(strcmp(pDnodeEp->ep.fqdn, pair->oldFqdn) == 0 && pDnodeEp->ep.port == pair->oldPort)) {
dError("dnode:%d, can't update ep:%s:%u to %s:%u since endpoint not matched", pair->id, pair->oldFqdn,
pair->oldPort, pair->newFqdn, pair->newPort, pDnodeEp->id);
tstrncpy(pDnodeEp->ep.fqdn, pair->newFqdn, TSDB_FQDN_LEN);
pDnodeEp->ep.port = pair->newPort;
}
pIter = taosHashIterate(pData->dnodeHash, pIter);
#endif
}
}
if (taosArrayGetSize(pData->dnodeEps) == 0) {
SDnodeEp dnodeEp = {0};
dnodeEp.isMnode = 1;
taosGetFqdnPortFromEp(tsFirst, &dnodeEp.ep);
taosArrayPush(pData->dnodeEps, &dnodeEp);
for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pData->oldDnodeEps); ++i) {
SDnodeEpPair *pair = taosArrayGet(pData->oldDnodeEps, i);
for (int32_t j = 0; j < (int32_t)taosArrayGetSize(pData->dnodeEps); ++j) {
SDnodeEp *pDnodeEp = taosArrayGet(pData->dnodeEps, j);
if (strcmp(pDnodeEp->ep.fqdn, pair->oldFqdn) == 0 && pDnodeEp->ep.port == pair->oldPort) {
dInfo("dnode:%d, will update ep:%s:%u to %s:%u", pDnodeEp->id, pDnodeEp->ep.fqdn, pDnodeEp->ep.port,
pair->newFqdn, pair->newPort);
tstrncpy(pDnodeEp->ep.fqdn, pair->newFqdn, TSDB_FQDN_LEN);
pDnodeEp->ep.port = pair->newPort;
}
}
dDebug("reset dnode list on startup");
dmResetEps(pData, pData->dnodeEps);
if (dmIsEpChanged(pData, pData->dnodeId, tsLocalEp)) {
dError("localEp %s different with %s and need reconfigured", tsLocalEp, file);
return -1;
}
pData->dnodeVer = 0;
return code;
}
......@@ -180,7 +180,9 @@ static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
SDB_GET_RESERVE(pRaw, dataPos, TSDB_DNODE_RESERVE_SIZE, _OVER)
terrno = 0;
tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port);
if (tmsgUpdateDnodeInfo(&pDnode->id, NULL, pDnode->fqdn, &pDnode->port)) {
mInfo("dnode:%d, endpoint changed", pDnode->id);
}
_OVER:
if (terrno != 0) {
......@@ -189,7 +191,7 @@ _OVER:
return NULL;
}
mTrace("dnode:%d, decode from raw:%p, row:%p", pDnode->id, pRaw, pDnode);
mTrace("dnode:%d, decode from raw:%p, row:%p ep:%s:%u", pDnode->id, pRaw, pDnode, pDnode->fqdn, pDnode->port);
return pRow;
}
......
......@@ -747,7 +747,7 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
pNode->clusterId = mndGetClusterId(pMnode);
pNode->nodePort = pObj->pDnode->port;
tstrncpy(pNode->nodeFqdn, pObj->pDnode->fqdn, TSDB_FQDN_LEN);
tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
mInfo("vgId:1, ep:%s:%u dnode:%d", pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
if (pObj->pDnode->id == pMnode->selfDnodeId) {
cfg.myIndex = cfg.replicaNum;
......
......@@ -303,7 +303,7 @@ int32_t mndInitSync(SMnode *pMnode) {
pNode->nodeId = pMgmt->replicas[i].id;
pNode->nodePort = pMgmt->replicas[i].port;
tstrncpy(pNode->nodeFqdn, pMgmt->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
mInfo("vgId:1, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId,
pNode->clusterId);
}
......
......@@ -86,7 +86,7 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) {
pNode->nodeId = pReq->replicas[i].id;
pNode->nodePort = pReq->replicas[i].port;
tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
vInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId);
}
......@@ -134,6 +134,21 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
return NULL;
}
// save vnode info on dnode ep changed
bool updated = false;
SSyncCfg *pCfg = &info.config.syncCfg;
for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
SNodeInfo *pNode = &pCfg->nodeInfo[i];
if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
updated = true;
}
}
if (updated) {
vInfo("vgId:%d, save vnode info since dnode info changed", info.config.vgId);
(void)vnodeSaveInfo(dir, &info);
(void)vnodeCommitInfo(dir, &info);
}
// create handle
pVnode = taosMemoryCalloc(1, sizeof(*pVnode) + strlen(path) + 1);
if (pVnode == NULL) {
......
......@@ -895,14 +895,25 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
// init by SSyncInfo
pSyncNode->vgId = pSyncInfo->vgId;
SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
bool updated = false;
sInfo("vgId:%d, start to open sync node, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex);
for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
SNodeInfo* pNode = &pCfg->nodeInfo[i];
tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
if (tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort)) {
updated = true;
}
sInfo("vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%" PRId64, pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort,
pNode->nodeId, pNode->clusterId);
}
if (updated) {
sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
if (syncWriteCfgFile(pSyncNode) != 0) {
sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId);
goto _error;
}
}
pSyncNode->pWal = pSyncInfo->pWal;
pSyncNode->msgcb = pSyncInfo->msgcb;
pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
......
......@@ -59,8 +59,8 @@ void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type) { (*defaultMsgCb.re
void tmsgReportStartup(const char* name, const char* desc) { (*defaultMsgCb.reportStartupFp)(name, desc); }
void tmsgUpdateDnodeInfo(int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port) {
(*defaultMsgCb.updateDnodeInfoFp)(defaultMsgCb.data, dnodeId, clusterId, fqdn, port);
bool tmsgUpdateDnodeInfo(int32_t* dnodeId, int64_t* clusterId, char* fqdn, uint16_t* port) {
return (*defaultMsgCb.updateDnodeInfoFp)(defaultMsgCb.data, dnodeId, clusterId, fqdn, port);
}
void tmsgUpdateDnodeEpSet(SEpSet* epset) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册