From 167ecea705a142100fe31c01f6a2591b7cf41f40 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Tue, 25 Jul 2023 19:56:28 +0800 Subject: [PATCH] fix: check term for role time 1. time of mnode becomes current role in this term 2. show 0 for offline mnodes --- include/common/tmsg.h | 2 ++ include/libs/sync/sync.h | 42 ++++++++++++++------------ source/common/src/systable.c | 6 ++-- source/common/src/tmsg.c | 9 ++++++ source/dnode/mnode/impl/inc/mndDef.h | 3 +- source/dnode/mnode/impl/src/mndDnode.c | 18 ++++++++--- source/dnode/mnode/impl/src/mndMain.c | 5 ++- source/dnode/mnode/impl/src/mndMnode.c | 31 +++++++++++++++---- source/libs/sync/inc/syncInt.h | 2 +- source/libs/sync/src/syncMain.c | 11 +++++-- 10 files changed, 90 insertions(+), 39 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 9e100be42a..2ed0d4e0ab 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1180,6 +1180,8 @@ typedef struct { typedef struct { int8_t syncState; int8_t syncRestore; + int64_t syncTerm; + int64_t roleTimeMs; } SMnodeLoad; typedef struct { diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 2a0a4b0f63..88ccf562c7 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -239,29 +239,31 @@ typedef struct SSyncState { ESyncState state; bool restored; bool canRead; + SyncTerm term; + int64_t roleTimeMs; } SSyncState; -int32_t syncInit(); -void syncCleanUp(); -int64_t syncOpen(SSyncInfo* pSyncInfo); -int32_t syncStart(int64_t rid); -void syncStop(int64_t rid); -void syncPreStop(int64_t rid); -void syncPostStop(int64_t rid); -int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq); -int32_t syncIsCatchUp(int64_t rid); +int32_t syncInit(); +void syncCleanUp(); +int64_t syncOpen(SSyncInfo* pSyncInfo); +int32_t syncStart(int64_t rid); +void syncStop(int64_t rid); +void syncPreStop(int64_t rid); +void syncPostStop(int64_t rid); +int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq); +int32_t syncIsCatchUp(int64_t rid); ESyncRole syncGetRole(int64_t rid); -int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); -int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg); -int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex); -int32_t syncEndSnapshot(int64_t rid); -int32_t syncLeaderTransfer(int64_t rid); -int32_t syncStepDown(int64_t rid, SyncTerm newTerm); -bool syncIsReadyForRead(int64_t rid); -bool syncSnapshotSending(int64_t rid); -bool syncSnapshotRecving(int64_t rid); -int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq); -int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg); +int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); +int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg); +int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex); +int32_t syncEndSnapshot(int64_t rid); +int32_t syncLeaderTransfer(int64_t rid); +int32_t syncStepDown(int64_t rid, SyncTerm newTerm); +bool syncIsReadyForRead(int64_t rid); +bool syncSnapshotSending(int64_t rid); +bool syncSnapshotRecving(int64_t rid); +int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq); +int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg); SSyncState syncGetState(int64_t rid); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 8fd235b9b3..0940fcef6a 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -33,7 +33,7 @@ static const SSysDbTableSchema dnodesSchema[] = { {.name = "support_vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, {.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, - {.name = "reboot_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, + {.name = "reboot_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, {.name = "note", .bytes = 256 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, #ifdef TD_ENTERPRISE {.name = "active_code", .bytes = TSDB_ACTIVE_KEY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, @@ -47,7 +47,7 @@ static const SSysDbTableSchema mnodesSchema[] = { {.name = "role", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, - {.name = "reboot_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, + {.name = "role_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, }; static const SSysDbTableSchema modulesSchema[] = { @@ -73,7 +73,7 @@ static const SSysDbTableSchema clusterSchema[] = { {.name = "name", .bytes = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "uptime", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, - {.name = "version", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, + {.name = "version", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "expire_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, }; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index a29c8556f2..adb22fdb0d 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1101,6 +1101,8 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { if (tEncodeI64(&encoder, pReq->qload.timeInFetchQueue) < 0) return -1; if (tEncodeI32(&encoder, pReq->statusSeq) < 0) return -1; + if (tEncodeI64(&encoder, pReq->mload.syncTerm) < 0) return -1; + if (tEncodeI64(&encoder, pReq->mload.roleTimeMs) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -1183,6 +1185,13 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { if (tDecodeI64(&decoder, &pReq->qload.timeInFetchQueue) < 0) return -1; if (tDecodeI32(&decoder, &pReq->statusSeq) < 0) return -1; + + pReq->mload.syncTerm = -1; + pReq->mload.roleTimeMs = 0; + if (!tDecodeIsEnd(&decoder)) { + if (tDecodeI64(&decoder, &pReq->mload.syncTerm) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->mload.roleTimeMs) < 0) return -1; + } tEndDecode(&decoder); tDecoderClear(&decoder); return 0; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 1f4bc19e33..936181e0e6 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -216,8 +216,9 @@ typedef struct { int64_t createdTime; int64_t updateTime; ESyncState syncState; + SyncTerm syncTerm; bool syncRestore; - int64_t stateStartTime; + int64_t roleTimeMs; SDnodeObj* pDnode; int32_t role; SyncIndex lastIndex; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index cfd026634c..febe808b44 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -524,13 +524,23 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id); if (pObj != NULL) { - if (pObj->syncState != statusReq.mload.syncState || pObj->syncRestore != statusReq.mload.syncRestore) { - mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d", pObj->id, syncStr(pObj->syncState), - syncStr(statusReq.mload.syncState), pObj->syncRestore, statusReq.mload.syncRestore); + bool roleChanged = pObj->syncState != statusReq.mload.syncState || + (statusReq.mload.syncTerm != -1 && pObj->syncTerm != statusReq.mload.syncTerm); + bool restoreChanged = pObj->syncRestore != statusReq.mload.syncRestore; + if (roleChanged || restoreChanged) { + mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64 + " to %" PRId64, + pObj->id, syncStr(pObj->syncState), syncStr(statusReq.mload.syncState), pObj->syncRestore, + statusReq.mload.syncRestore, pObj->syncTerm, statusReq.mload.syncTerm); pObj->syncState = statusReq.mload.syncState; pObj->syncRestore = statusReq.mload.syncRestore; - pObj->stateStartTime = taosGetTimestampMs(); + pObj->syncTerm = statusReq.mload.syncTerm; } + + if (roleChanged) { + pObj->roleTimeMs = (statusReq.mload.roleTimeMs != 0) ? statusReq.mload.roleTimeMs : taosGetTimestampMs(); + } + mndReleaseMnode(pMnode, pObj); } diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 79abc57a39..1071a6cf6e 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -890,7 +890,10 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) { SSyncState state = syncGetState(pMnode->syncMgmt.sync); pLoad->syncState = state.state; pLoad->syncRestore = state.restored; - mTrace("mnode current syncState is %s, syncRestore:%d", syncStr(pLoad->syncState), pLoad->syncRestore); + pLoad->syncTerm = state.term; + pLoad->roleTimeMs = state.roleTimeMs; + mTrace("mnode current syncState is %s, syncRestore:%d, syncTerm:%" PRId64 " ,roleTimeMs:%" PRId64, + syncStr(pLoad->syncState), pLoad->syncRestore, pLoad->syncTerm, pLoad->roleTimeMs); return 0; } diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 91fe1257d2..4ee2bc159b 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -319,7 +319,7 @@ static int32_t mndBuildCreateMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *p return 0; } -static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans, +static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans, SDAlterMnodeTypeReq *pAlterMnodeTypeReq, SEpSet *pAlterMnodeTypeEpSet) { int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterMnodeTypeReq); void *pReq = taosMemoryMalloc(contLen); @@ -803,9 +803,17 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB int32_t numOfRows = 0; int32_t cols = 0; SMnodeObj *pObj = NULL; + SMnodeObj *pSelfObj = NULL; ESdbStatus objStatus = 0; char *pWrite; int64_t curMs = taosGetTimestampMs(); + int64_t dummyTimeMs = 0; + + pSelfObj = sdbAcquire(pSdb, SDB_MNODE, &pMnode->selfDnodeId); + if (pSelfObj == NULL) { + mError("mnode:%d, failed to acquire self %s", pMnode->selfDnodeId, terrstr()); + goto _out; + } while (numOfRows < rows) { pShow->pIter = sdbFetchAll(pSdb, SDB_MNODE, pShow->pIter, (void **)&pObj, &objStatus, true); @@ -825,7 +833,8 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB if (pObj->id == pMnode->selfDnodeId) { snprintf(role, sizeof(role), "%s%s", syncStr(TAOS_SYNC_STATE_LEADER), pMnode->restored ? "" : "*"); } - if (mndIsDnodeOnline(pObj->pDnode, curMs)) { + bool isDnodeOnline = mndIsDnodeOnline(pObj->pDnode, curMs); + if (isDnodeOnline) { tstrncpy(role, syncStr(pObj->syncState), sizeof(role)); if (pObj->syncState == TAOS_SYNC_STATE_LEADER && pObj->id != pMnode->selfDnodeId) { tstrncpy(role, syncStr(TAOS_SYNC_STATE_ERROR), sizeof(role)); @@ -840,7 +849,7 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB const char *status = "ready"; if (objStatus == SDB_STATUS_CREATING) status = "creating"; if (objStatus == SDB_STATUS_DROPPING) status = "dropping"; - if (!mndIsDnodeOnline(pObj->pDnode, curMs)) status = "offline"; + if (!isDnodeOnline) status = "offline"; char b3[9 + VARSTR_HEADER_SIZE] = {0}; STR_WITH_MAXSIZE_TO_VARSTR(b3, status, pShow->pMeta->pSchemas[cols].bytes); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -850,7 +859,15 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createdTime, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->stateStartTime, false); + if (pObj->syncTerm != pSelfObj->syncTerm || !isDnodeOnline) { + // state of old term / no status report => use dummyTimeMs + if (pObj->syncTerm > pSelfObj->syncTerm) { + mError("mnode:%d has a newer term:%" PRId64 " than me:%" PRId64, pObj->id, pObj->syncTerm, pSelfObj->syncTerm); + } + colDataSetVal(pColInfo, numOfRows, (const char *)&dummyTimeMs, false); + } else { + colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->roleTimeMs, false); + } numOfRows++; sdbRelease(pSdb, pObj); @@ -858,6 +875,8 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB pShow->numOfRows += numOfRows; +_out: + sdbRelease(pSdb, pSelfObj); return numOfRows; } @@ -999,12 +1018,12 @@ static void mndReloadSyncConfig(SMnode *pMnode) { } if (pMnode->syncMgmt.sync > 0) { - mInfo("vgId:1, mnode sync reconfig, totalReplica:%d replica:%d myIndex:%d", + mInfo("vgId:1, mnode sync reconfig, totalReplica:%d replica:%d myIndex:%d", cfg.totalReplicaNum, cfg.replicaNum, cfg.myIndex); for (int32_t i = 0; i < cfg.totalReplicaNum; ++i) { SNodeInfo *pNode = &cfg.nodeInfo[i]; - mInfo("vgId:1, index:%d, ep:%s:%u dnode:%d cluster:%" PRId64 " role:%d", i, pNode->nodeFqdn, pNode->nodePort, + mInfo("vgId:1, index:%d, ep:%s:%u dnode:%d cluster:%" PRId64 " role:%d", i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId, pNode->clusterId, pNode->nodeRole); } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 7d336c8313..f74e43f47f 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -213,7 +213,7 @@ typedef struct SSyncNode { int64_t minMatchIndex; int64_t startTime; - int64_t leaderTime; + int64_t roleTimeMs; int64_t lastReplicateTime; int32_t electNum; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index f8e21af2c3..f1e3c35a49 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -508,12 +508,14 @@ SSyncState syncGetState(int64_t rid) { SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode != NULL) { state.state = pSyncNode->state; + state.roleTimeMs = pSyncNode->roleTimeMs; state.restored = pSyncNode->restoreFinish; if (pSyncNode->vgId != 1) { state.canRead = syncNodeIsReadyForRead(pSyncNode); } else { state.canRead = state.restored; } + state.term = raftStoreGetTerm(pSyncNode); syncNodeRelease(pSyncNode); } @@ -898,6 +900,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { // init TLA+ server vars pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; + pSyncNode->roleTimeMs = taosGetTimestampMs(); if (raftStoreOpen(pSyncNode) != 0) { sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath); goto _error; @@ -1035,7 +1038,6 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { int64_t timeNow = taosGetTimestampMs(); pSyncNode->startTime = timeNow; - pSyncNode->leaderTime = timeNow; pSyncNode->lastReplicateTime = timeNow; // snapshotting @@ -1131,6 +1133,7 @@ int32_t syncNodeStart(SSyncNode* pSyncNode) { int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) { // state change pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; + pSyncNode->roleTimeMs = taosGetTimestampMs(); syncNodeStopHeartbeatTimer(pSyncNode); // reset elect timer, long enough @@ -1667,6 +1670,7 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { // state change pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; + pSyncNode->roleTimeMs = taosGetTimestampMs(); syncNodeStopHeartbeatTimer(pSyncNode); // trace log @@ -1695,6 +1699,7 @@ void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) { // state change pSyncNode->state = TAOS_SYNC_STATE_LEARNER; + pSyncNode->roleTimeMs = taosGetTimestampMs(); // trace log sNTrace(pSyncNode, "become learner %s", debugStr); @@ -1730,8 +1735,6 @@ void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) { // /\ UNCHANGED <> // void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { - pSyncNode->leaderTime = taosGetTimestampMs(); - pSyncNode->becomeLeaderNum++; pSyncNode->hbrSlowNum = 0; @@ -1740,6 +1743,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { // state change pSyncNode->state = TAOS_SYNC_STATE_LEADER; + pSyncNode->roleTimeMs = taosGetTimestampMs(); // set leader cache pSyncNode->leaderCache = pSyncNode->myRaftId; @@ -1839,6 +1843,7 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) { void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER); pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE; + pSyncNode->roleTimeMs = taosGetTimestampMs(); SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex); -- GitLab