From b638d0ef8c582160a9a39fa48152ed67e24592ee Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 25 May 2022 16:41:38 +0800 Subject: [PATCH] refactor: let mnode report sync state --- include/common/tmsg.h | 1 + include/dnode/mnode/mnode.h | 1 + source/common/src/tmsg.c | 5 +++ source/dnode/mgmt/mgmt_dnode/src/dmHandle.c | 3 +- source/dnode/mgmt/mgmt_mnode/src/mmInt.c | 9 ++++-- source/dnode/mgmt/node_util/inc/dmUtil.h | 4 +-- source/dnode/mnode/impl/inc/mndDef.h | 5 ++- source/dnode/mnode/impl/inc/mndInt.h | 3 +- source/dnode/mnode/impl/inc/mndMnode.h | 1 - source/dnode/mnode/impl/src/mndDnode.c | 27 +++++++++++----- source/dnode/mnode/impl/src/mndMnode.c | 35 +++++---------------- source/dnode/mnode/impl/src/mndSync.c | 5 ++- source/dnode/mnode/impl/src/mnode.c | 18 +++++------ 13 files changed, 58 insertions(+), 59 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 32cb739535..de26af48ee 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -950,6 +950,7 @@ typedef struct { int32_t numOfCores; int32_t numOfSupportVnodes; char dnodeEp[TSDB_EP_LEN]; + SMnodeLoad mload; SClusterCfg clusterCfg; SArray* pVloads; // array of SVnodeLoad } SStatusReq; diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 69260d720c..bc14dc3210 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -29,6 +29,7 @@ extern "C" { typedef struct SMnode SMnode; typedef struct { + int32_t dnodeId; bool standby; bool deploy; int8_t replica; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 7f886b078a..93a592f120 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -891,6 +891,9 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { if (tEncodeI64(&encoder, pload->pointsWritten) < 0) return -1; } + // mnode loads + if (tEncodeI32(&encoder, pReq->mload.syncState) < 0) return -1; + tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -946,6 +949,8 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { } } + if (tDecodeI32(&decoder, &pReq->mload.syncState) < 0) return -1; + tEndDecode(&decoder); tDecoderClear(&decoder); return 0; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index f7337f482f..bb2c069eaa 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -75,8 +75,9 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { (*pMgmt->getVnodeLoadsFp)(&vinfo); req.pVloads = vinfo.pVloads; - SMonMloadInfo minfo = {0}; + SMonMloadInfo minfo = {0}; (*pMgmt->getMnodeLoadsFp)(&minfo); + req.mload = minfo.load; int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); void *pHead = rpcMallocCont(contLen); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index 875be9768c..964c2d42b7 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -42,6 +42,8 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu pOption->standby = false; pOption->deploy = true; pOption->msgCb = pMgmt->msgCb; + pOption->dnodeId = pMgmt->pData->dnodeId; + pOption->replica = 1; pOption->selfIndex = 0; @@ -52,9 +54,10 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu } static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { - pOption->msgCb = pMgmt->msgCb; pOption->deploy = false; pOption->standby = false; + pOption->msgCb = pMgmt->msgCb; + pOption->dnodeId = pMgmt->pData->dnodeId; if (pMgmt->replica > 0) { pOption->standby = true; @@ -71,9 +74,11 @@ static void mmBuildOptionForOpen(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { } static int32_t mmBuildOptionForAlter(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCreateMnodeReq *pCreate) { - pOption->msgCb = pMgmt->msgCb; pOption->standby = false; pOption->deploy = false; + pOption->msgCb = pMgmt->msgCb; + pOption->dnodeId = pMgmt->pData->dnodeId; + pOption->replica = pCreate->replica; pOption->selfIndex = -1; diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 4946669678..0d921c2e8b 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -90,8 +90,8 @@ typedef enum { typedef int32_t (*ProcessCreateNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef void (*SendMonitorReportFp)(); -typedef void (*GetVnodeLoadsFp)(); -typedef void (*GetMnodeLoadsFp)(); +typedef void (*GetVnodeLoadsFp)(SMonVloadInfo *pInfo); +typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo); typedef struct { int32_t dnodeId; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 432b95059d..26cfaa62ff 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -199,9 +199,8 @@ typedef struct { int32_t id; int64_t createdTime; int64_t updateTime; - ESyncState role; - int32_t roleTerm; - int64_t roleTime; + ESyncState state; + int64_t stateStartTime; SDnodeObj* pDnode; } SMnodeObj; diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 489e1aec5c..189ea82bfc 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -78,7 +78,6 @@ typedef struct { SWal *pWal; sem_t syncSem; int64_t sync; - ESyncState state; bool standby; bool restored; int32_t errCode; @@ -90,7 +89,7 @@ typedef struct { } SGrantInfo; typedef struct SMnode { - int32_t selfId; + int32_t selfDnodeId; int64_t clusterId; TdThread thread; bool deploy; diff --git a/source/dnode/mnode/impl/inc/mndMnode.h b/source/dnode/mnode/impl/inc/mndMnode.h index a5cdfa1061..fd62b3ce75 100644 --- a/source/dnode/mnode/impl/inc/mndMnode.h +++ b/source/dnode/mnode/impl/inc/mndMnode.h @@ -28,7 +28,6 @@ SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId); void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj); bool mndIsMnode(SMnode *pMnode, int32_t dnodeId); void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet); -void mndUpdateMnodeRole(SMnode *pMnode); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 1f6dc03dc3..047562ec02 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -58,14 +58,16 @@ static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter); int32_t mndInitDnode(SMnode *pMnode) { - SSdbTable table = {.sdbType = SDB_DNODE, - .keyType = SDB_KEY_INT32, - .deployFp = (SdbDeployFp)mndCreateDefaultDnode, - .encodeFp = (SdbEncodeFp)mndDnodeActionEncode, - .decodeFp = (SdbDecodeFp)mndDnodeActionDecode, - .insertFp = (SdbInsertFp)mndDnodeActionInsert, - .updateFp = (SdbUpdateFp)mndDnodeActionUpdate, - .deleteFp = (SdbDeleteFp)mndDnodeActionDelete}; + SSdbTable table = { + .sdbType = SDB_DNODE, + .keyType = SDB_KEY_INT32, + .deployFp = (SdbDeployFp)mndCreateDefaultDnode, + .encodeFp = (SdbEncodeFp)mndDnodeActionEncode, + .decodeFp = (SdbDecodeFp)mndDnodeActionDecode, + .insertFp = (SdbInsertFp)mndDnodeActionInsert, + .updateFp = (SdbUpdateFp)mndDnodeActionUpdate, + .deleteFp = (SdbDeleteFp)mndDnodeActionDelete, + }; mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DNODE, mndProcessCreateDnodeReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_DNODE, mndProcessDropDnodeReq); @@ -377,6 +379,15 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { mndReleaseVgroup(pMnode, pVgroup); } + SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id); + if (pObj != NULL) { + if (pObj->state != statusReq.mload.syncState) { + pObj->state = statusReq.mload.syncState; + pObj->stateStartTime = taosGetTimestampMs(); + } + mndReleaseMnode(pMnode, pObj); + } + int64_t curMs = taosGetTimestampMs(); bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); bool dnodeChanged = (statusReq.dnodeVer != sdbGetTableVer(pMnode->pSdb, SDB_DNODE)); diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 14d33414fa..93284a95c5 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -77,28 +77,6 @@ void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj) { sdbRelease(pMnode->pSdb, pObj); } -void mndUpdateMnodeRole(SMnode *pMnode) { - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; - while (1) { - SMnodeObj *pObj = NULL; - pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj); - if (pIter == NULL) break; - - ESyncState lastRole = pObj->role; - if (pObj->id == 1) { - pObj->role = TAOS_SYNC_STATE_LEADER; - } else { - pObj->role = TAOS_SYNC_STATE_CANDIDATE; - } - if (pObj->role != lastRole) { - pObj->roleTime = taosGetTimestampMs(); - } - - sdbRelease(pSdb, pObj); - } -} - static int32_t mndCreateDefaultMnode(SMnode *pMnode) { SMnodeObj mnodeObj = {0}; mnodeObj.id = 1; @@ -209,7 +187,7 @@ static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) { return -1; } - pObj->role = TAOS_SYNC_STATE_FOLLOWER; + pObj->state = TAOS_SYNC_STATE_ERROR; return 0; } @@ -253,7 +231,7 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) { if (pObj->pDnode == NULL) { mError("mnode:%d, no corresponding dnode exists", pObj->id); } else { - if (pObj->role == TAOS_SYNC_STATE_LEADER) { + if (pObj->state == TAOS_SYNC_STATE_LEADER) { pEpSet->inUse = pEpSet->numOfEps; } addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port); @@ -581,7 +559,7 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) { goto _OVER; } - if (pMnode->selfId == dropReq.dnodeId) { + if (pMnode->selfDnodeId == dropReq.dnodeId) { terrno = TSDB_CODE_MND_CANT_DROP_MASTER; goto _OVER; } @@ -652,10 +630,11 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, b1, false); - // const char *roles = syncStr(syncGetMyRole(pMnode->syncMgmt.sync)); - const char *roles = syncStr(TAOS_SYNC_STATE_FOLLOWER); - if (pObj->id == pMnode->selfId) { + const char *roles = NULL; + if (pObj->id == pMnode->selfDnodeId) { roles = syncStr(TAOS_SYNC_STATE_LEADER); + } else { + roles = syncStr(pObj->state); } char *b2 = taosMemoryCalloc(1, 12 + VARSTR_HEADER_SIZE); STR_WITH_MAXSIZE_TO_VARSTR(b2, roles, pShow->pMeta->pSchemas[cols].bytes); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 231cecf58e..f6b07fd880 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -196,9 +196,8 @@ void mndSyncStop(SMnode *pMnode) {} bool mndIsMaster(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - pMgmt->state = syncGetMyRole(pMgmt->sync); - - return (pMgmt->state == TAOS_SYNC_STATE_LEADER) && (pMnode->syncMgmt.restored); + ESyncState state = syncGetMyRole(pMgmt->sync); + return (state == TAOS_SYNC_STATE_LEADER) && (pMnode->syncMgmt.restored); } int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) { diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 5cc19f8bee..4e4f69e01d 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -264,7 +264,7 @@ static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { pMnode->selfIndex = pOption->selfIndex; memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); pMnode->msgCb = pOption->msgCb; - pMnode->selfId = pOption->replicas[pOption->selfIndex].id; + pMnode->selfDnodeId = pOption->dnodeId; pMnode->syncMgmt.standby = pOption->standby; } @@ -318,7 +318,6 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { return NULL; } - mndUpdateMnodeRole(pMnode); mDebug("mnode open successfully "); return pMnode; } @@ -519,16 +518,17 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr SMonMnodeDesc desc = {0}; desc.mnode_id = pObj->id; tstrncpy(desc.mnode_ep, pObj->pDnode->ep, sizeof(desc.mnode_ep)); - tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role)); - // tstrncpy(desc.role, syncStr(pObj->role), sizeof(desc.role)); - taosArrayPush(pClusterInfo->mnodes, &desc); - sdbRelease(pSdb, pObj); - if (pObj->id == pMnode->selfId) { + if (pObj->id == pMnode->selfDnodeId) { pClusterInfo->first_ep_dnode_id = pObj->id; tstrncpy(pClusterInfo->first_ep, pObj->pDnode->ep, sizeof(pClusterInfo->first_ep)); - pClusterInfo->master_uptime = (ms - pObj->roleTime) / (86400000.0f); + pClusterInfo->master_uptime = (ms - pObj->stateStartTime) / (86400000.0f); + tstrncpy(desc.role, syncStr(TAOS_SYNC_STATE_LEADER), sizeof(desc.role)); + } else { + tstrncpy(desc.role, syncStr(pObj->state), sizeof(desc.role)); } + taosArrayPush(pClusterInfo->mnodes, &desc); + sdbRelease(pSdb, pObj); } // vgroup info @@ -581,6 +581,6 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr } int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) { - pLoad->syncState = pMnode->syncMgmt.state; + pLoad->syncState = syncGetMyRole(pMnode->syncMgmt.sync); return 0; } -- GitLab