提交 167ecea7 编写于 作者: S Shungang Li

fix: check term for role time

1. time of mnode becomes current role in this term
2. show 0 for offline mnodes
上级 610f9d19
...@@ -1180,6 +1180,8 @@ typedef struct { ...@@ -1180,6 +1180,8 @@ typedef struct {
typedef struct { typedef struct {
int8_t syncState; int8_t syncState;
int8_t syncRestore; int8_t syncRestore;
int64_t syncTerm;
int64_t roleTimeMs;
} SMnodeLoad; } SMnodeLoad;
typedef struct { typedef struct {
......
...@@ -239,29 +239,31 @@ typedef struct SSyncState { ...@@ -239,29 +239,31 @@ typedef struct SSyncState {
ESyncState state; ESyncState state;
bool restored; bool restored;
bool canRead; bool canRead;
SyncTerm term;
int64_t roleTimeMs;
} SSyncState; } SSyncState;
int32_t syncInit(); int32_t syncInit();
void syncCleanUp(); void syncCleanUp();
int64_t syncOpen(SSyncInfo* pSyncInfo); int64_t syncOpen(SSyncInfo* pSyncInfo);
int32_t syncStart(int64_t rid); int32_t syncStart(int64_t rid);
void syncStop(int64_t rid); void syncStop(int64_t rid);
void syncPreStop(int64_t rid); void syncPreStop(int64_t rid);
void syncPostStop(int64_t rid); void syncPostStop(int64_t rid);
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq);
int32_t syncIsCatchUp(int64_t rid); int32_t syncIsCatchUp(int64_t rid);
ESyncRole syncGetRole(int64_t rid); ESyncRole syncGetRole(int64_t rid);
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg); int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg);
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex); int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex);
int32_t syncEndSnapshot(int64_t rid); int32_t syncEndSnapshot(int64_t rid);
int32_t syncLeaderTransfer(int64_t rid); int32_t syncLeaderTransfer(int64_t rid);
int32_t syncStepDown(int64_t rid, SyncTerm newTerm); int32_t syncStepDown(int64_t rid, SyncTerm newTerm);
bool syncIsReadyForRead(int64_t rid); bool syncIsReadyForRead(int64_t rid);
bool syncSnapshotSending(int64_t rid); bool syncSnapshotSending(int64_t rid);
bool syncSnapshotRecving(int64_t rid); bool syncSnapshotRecving(int64_t rid);
int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq); int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq);
int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg); int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg);
SSyncState syncGetState(int64_t rid); SSyncState syncGetState(int64_t rid);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
......
...@@ -33,7 +33,7 @@ static const SSysDbTableSchema dnodesSchema[] = { ...@@ -33,7 +33,7 @@ static const SSysDbTableSchema dnodesSchema[] = {
{.name = "support_vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, {.name = "support_vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
{.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "reboot_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, {.name = "reboot_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "note", .bytes = 256 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "note", .bytes = 256 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
#ifdef TD_ENTERPRISE #ifdef TD_ENTERPRISE
{.name = "active_code", .bytes = TSDB_ACTIVE_KEY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "active_code", .bytes = TSDB_ACTIVE_KEY_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
...@@ -47,7 +47,7 @@ static const SSysDbTableSchema mnodesSchema[] = { ...@@ -47,7 +47,7 @@ static const SSysDbTableSchema mnodesSchema[] = {
{.name = "role", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "role", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "reboot_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, {.name = "role_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
}; };
static const SSysDbTableSchema modulesSchema[] = { static const SSysDbTableSchema modulesSchema[] = {
...@@ -73,7 +73,7 @@ static const SSysDbTableSchema clusterSchema[] = { ...@@ -73,7 +73,7 @@ static const SSysDbTableSchema clusterSchema[] = {
{.name = "name", .bytes = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "name", .bytes = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "uptime", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, {.name = "uptime", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
{.name = "version", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "version", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "expire_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true}, {.name = "expire_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = true},
}; };
......
...@@ -1101,6 +1101,8 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { ...@@ -1101,6 +1101,8 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tEncodeI64(&encoder, pReq->qload.timeInFetchQueue) < 0) return -1; if (tEncodeI64(&encoder, pReq->qload.timeInFetchQueue) < 0) return -1;
if (tEncodeI32(&encoder, pReq->statusSeq) < 0) return -1; if (tEncodeI32(&encoder, pReq->statusSeq) < 0) return -1;
if (tEncodeI64(&encoder, pReq->mload.syncTerm) < 0) return -1;
if (tEncodeI64(&encoder, pReq->mload.roleTimeMs) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
...@@ -1183,6 +1185,13 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { ...@@ -1183,6 +1185,13 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tDecodeI64(&decoder, &pReq->qload.timeInFetchQueue) < 0) return -1; if (tDecodeI64(&decoder, &pReq->qload.timeInFetchQueue) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->statusSeq) < 0) return -1; if (tDecodeI32(&decoder, &pReq->statusSeq) < 0) return -1;
pReq->mload.syncTerm = -1;
pReq->mload.roleTimeMs = 0;
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI64(&decoder, &pReq->mload.syncTerm) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->mload.roleTimeMs) < 0) return -1;
}
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
......
...@@ -216,8 +216,9 @@ typedef struct { ...@@ -216,8 +216,9 @@ typedef struct {
int64_t createdTime; int64_t createdTime;
int64_t updateTime; int64_t updateTime;
ESyncState syncState; ESyncState syncState;
SyncTerm syncTerm;
bool syncRestore; bool syncRestore;
int64_t stateStartTime; int64_t roleTimeMs;
SDnodeObj* pDnode; SDnodeObj* pDnode;
int32_t role; int32_t role;
SyncIndex lastIndex; SyncIndex lastIndex;
......
...@@ -524,13 +524,23 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { ...@@ -524,13 +524,23 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id); SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
if (pObj != NULL) { if (pObj != NULL) {
if (pObj->syncState != statusReq.mload.syncState || pObj->syncRestore != statusReq.mload.syncRestore) { bool roleChanged = pObj->syncState != statusReq.mload.syncState ||
mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d", pObj->id, syncStr(pObj->syncState), (statusReq.mload.syncTerm != -1 && pObj->syncTerm != statusReq.mload.syncTerm);
syncStr(statusReq.mload.syncState), pObj->syncRestore, statusReq.mload.syncRestore); bool restoreChanged = pObj->syncRestore != statusReq.mload.syncRestore;
if (roleChanged || restoreChanged) {
mInfo("dnode:%d, mnode syncState from %s to %s, restoreState from %d to %d, syncTerm from %" PRId64
" to %" PRId64,
pObj->id, syncStr(pObj->syncState), syncStr(statusReq.mload.syncState), pObj->syncRestore,
statusReq.mload.syncRestore, pObj->syncTerm, statusReq.mload.syncTerm);
pObj->syncState = statusReq.mload.syncState; pObj->syncState = statusReq.mload.syncState;
pObj->syncRestore = statusReq.mload.syncRestore; pObj->syncRestore = statusReq.mload.syncRestore;
pObj->stateStartTime = taosGetTimestampMs(); pObj->syncTerm = statusReq.mload.syncTerm;
} }
if (roleChanged) {
pObj->roleTimeMs = (statusReq.mload.roleTimeMs != 0) ? statusReq.mload.roleTimeMs : taosGetTimestampMs();
}
mndReleaseMnode(pMnode, pObj); mndReleaseMnode(pMnode, pObj);
} }
......
...@@ -890,7 +890,10 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) { ...@@ -890,7 +890,10 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
SSyncState state = syncGetState(pMnode->syncMgmt.sync); SSyncState state = syncGetState(pMnode->syncMgmt.sync);
pLoad->syncState = state.state; pLoad->syncState = state.state;
pLoad->syncRestore = state.restored; pLoad->syncRestore = state.restored;
mTrace("mnode current syncState is %s, syncRestore:%d", syncStr(pLoad->syncState), pLoad->syncRestore); pLoad->syncTerm = state.term;
pLoad->roleTimeMs = state.roleTimeMs;
mTrace("mnode current syncState is %s, syncRestore:%d, syncTerm:%" PRId64 " ,roleTimeMs:%" PRId64,
syncStr(pLoad->syncState), pLoad->syncRestore, pLoad->syncTerm, pLoad->roleTimeMs);
return 0; return 0;
} }
......
...@@ -319,7 +319,7 @@ static int32_t mndBuildCreateMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *p ...@@ -319,7 +319,7 @@ static int32_t mndBuildCreateMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *p
return 0; return 0;
} }
static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans, static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans,
SDAlterMnodeTypeReq *pAlterMnodeTypeReq, SEpSet *pAlterMnodeTypeEpSet) { SDAlterMnodeTypeReq *pAlterMnodeTypeReq, SEpSet *pAlterMnodeTypeEpSet) {
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterMnodeTypeReq); int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterMnodeTypeReq);
void *pReq = taosMemoryMalloc(contLen); void *pReq = taosMemoryMalloc(contLen);
...@@ -803,9 +803,17 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB ...@@ -803,9 +803,17 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
int32_t numOfRows = 0; int32_t numOfRows = 0;
int32_t cols = 0; int32_t cols = 0;
SMnodeObj *pObj = NULL; SMnodeObj *pObj = NULL;
SMnodeObj *pSelfObj = NULL;
ESdbStatus objStatus = 0; ESdbStatus objStatus = 0;
char *pWrite; char *pWrite;
int64_t curMs = taosGetTimestampMs(); int64_t curMs = taosGetTimestampMs();
int64_t dummyTimeMs = 0;
pSelfObj = sdbAcquire(pSdb, SDB_MNODE, &pMnode->selfDnodeId);
if (pSelfObj == NULL) {
mError("mnode:%d, failed to acquire self %s", pMnode->selfDnodeId, terrstr());
goto _out;
}
while (numOfRows < rows) { while (numOfRows < rows) {
pShow->pIter = sdbFetchAll(pSdb, SDB_MNODE, pShow->pIter, (void **)&pObj, &objStatus, true); pShow->pIter = sdbFetchAll(pSdb, SDB_MNODE, pShow->pIter, (void **)&pObj, &objStatus, true);
...@@ -825,7 +833,8 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB ...@@ -825,7 +833,8 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
if (pObj->id == pMnode->selfDnodeId) { if (pObj->id == pMnode->selfDnodeId) {
snprintf(role, sizeof(role), "%s%s", syncStr(TAOS_SYNC_STATE_LEADER), pMnode->restored ? "" : "*"); snprintf(role, sizeof(role), "%s%s", syncStr(TAOS_SYNC_STATE_LEADER), pMnode->restored ? "" : "*");
} }
if (mndIsDnodeOnline(pObj->pDnode, curMs)) { bool isDnodeOnline = mndIsDnodeOnline(pObj->pDnode, curMs);
if (isDnodeOnline) {
tstrncpy(role, syncStr(pObj->syncState), sizeof(role)); tstrncpy(role, syncStr(pObj->syncState), sizeof(role));
if (pObj->syncState == TAOS_SYNC_STATE_LEADER && pObj->id != pMnode->selfDnodeId) { if (pObj->syncState == TAOS_SYNC_STATE_LEADER && pObj->id != pMnode->selfDnodeId) {
tstrncpy(role, syncStr(TAOS_SYNC_STATE_ERROR), sizeof(role)); tstrncpy(role, syncStr(TAOS_SYNC_STATE_ERROR), sizeof(role));
...@@ -840,7 +849,7 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB ...@@ -840,7 +849,7 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
const char *status = "ready"; const char *status = "ready";
if (objStatus == SDB_STATUS_CREATING) status = "creating"; if (objStatus == SDB_STATUS_CREATING) status = "creating";
if (objStatus == SDB_STATUS_DROPPING) status = "dropping"; if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
if (!mndIsDnodeOnline(pObj->pDnode, curMs)) status = "offline"; if (!isDnodeOnline) status = "offline";
char b3[9 + VARSTR_HEADER_SIZE] = {0}; char b3[9 + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(b3, status, pShow->pMeta->pSchemas[cols].bytes); STR_WITH_MAXSIZE_TO_VARSTR(b3, status, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
...@@ -850,7 +859,15 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB ...@@ -850,7 +859,15 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createdTime, false); colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->createdTime, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->stateStartTime, false); if (pObj->syncTerm != pSelfObj->syncTerm || !isDnodeOnline) {
// state of old term / no status report => use dummyTimeMs
if (pObj->syncTerm > pSelfObj->syncTerm) {
mError("mnode:%d has a newer term:%" PRId64 " than me:%" PRId64, pObj->id, pObj->syncTerm, pSelfObj->syncTerm);
}
colDataSetVal(pColInfo, numOfRows, (const char *)&dummyTimeMs, false);
} else {
colDataSetVal(pColInfo, numOfRows, (const char *)&pObj->roleTimeMs, false);
}
numOfRows++; numOfRows++;
sdbRelease(pSdb, pObj); sdbRelease(pSdb, pObj);
...@@ -858,6 +875,8 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB ...@@ -858,6 +875,8 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pShow->numOfRows += numOfRows; pShow->numOfRows += numOfRows;
_out:
sdbRelease(pSdb, pSelfObj);
return numOfRows; return numOfRows;
} }
...@@ -999,12 +1018,12 @@ static void mndReloadSyncConfig(SMnode *pMnode) { ...@@ -999,12 +1018,12 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
} }
if (pMnode->syncMgmt.sync > 0) { if (pMnode->syncMgmt.sync > 0) {
mInfo("vgId:1, mnode sync reconfig, totalReplica:%d replica:%d myIndex:%d", mInfo("vgId:1, mnode sync reconfig, totalReplica:%d replica:%d myIndex:%d",
cfg.totalReplicaNum, cfg.replicaNum, cfg.myIndex); cfg.totalReplicaNum, cfg.replicaNum, cfg.myIndex);
for (int32_t i = 0; i < cfg.totalReplicaNum; ++i) { for (int32_t i = 0; i < cfg.totalReplicaNum; ++i) {
SNodeInfo *pNode = &cfg.nodeInfo[i]; SNodeInfo *pNode = &cfg.nodeInfo[i];
mInfo("vgId:1, index:%d, ep:%s:%u dnode:%d cluster:%" PRId64 " role:%d", i, pNode->nodeFqdn, pNode->nodePort, mInfo("vgId:1, index:%d, ep:%s:%u dnode:%d cluster:%" PRId64 " role:%d", i, pNode->nodeFqdn, pNode->nodePort,
pNode->nodeId, pNode->clusterId, pNode->nodeRole); pNode->nodeId, pNode->clusterId, pNode->nodeRole);
} }
......
...@@ -213,7 +213,7 @@ typedef struct SSyncNode { ...@@ -213,7 +213,7 @@ typedef struct SSyncNode {
int64_t minMatchIndex; int64_t minMatchIndex;
int64_t startTime; int64_t startTime;
int64_t leaderTime; int64_t roleTimeMs;
int64_t lastReplicateTime; int64_t lastReplicateTime;
int32_t electNum; int32_t electNum;
......
...@@ -508,12 +508,14 @@ SSyncState syncGetState(int64_t rid) { ...@@ -508,12 +508,14 @@ SSyncState syncGetState(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode != NULL) { if (pSyncNode != NULL) {
state.state = pSyncNode->state; state.state = pSyncNode->state;
state.roleTimeMs = pSyncNode->roleTimeMs;
state.restored = pSyncNode->restoreFinish; state.restored = pSyncNode->restoreFinish;
if (pSyncNode->vgId != 1) { if (pSyncNode->vgId != 1) {
state.canRead = syncNodeIsReadyForRead(pSyncNode); state.canRead = syncNodeIsReadyForRead(pSyncNode);
} else { } else {
state.canRead = state.restored; state.canRead = state.restored;
} }
state.term = raftStoreGetTerm(pSyncNode);
syncNodeRelease(pSyncNode); syncNodeRelease(pSyncNode);
} }
...@@ -898,6 +900,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { ...@@ -898,6 +900,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
// init TLA+ server vars // init TLA+ server vars
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
pSyncNode->roleTimeMs = taosGetTimestampMs();
if (raftStoreOpen(pSyncNode) != 0) { if (raftStoreOpen(pSyncNode) != 0) {
sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath); sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath);
goto _error; goto _error;
...@@ -1035,7 +1038,6 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { ...@@ -1035,7 +1038,6 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
int64_t timeNow = taosGetTimestampMs(); int64_t timeNow = taosGetTimestampMs();
pSyncNode->startTime = timeNow; pSyncNode->startTime = timeNow;
pSyncNode->leaderTime = timeNow;
pSyncNode->lastReplicateTime = timeNow; pSyncNode->lastReplicateTime = timeNow;
// snapshotting // snapshotting
...@@ -1131,6 +1133,7 @@ int32_t syncNodeStart(SSyncNode* pSyncNode) { ...@@ -1131,6 +1133,7 @@ int32_t syncNodeStart(SSyncNode* pSyncNode) {
int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) { int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
// state change // state change
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
pSyncNode->roleTimeMs = taosGetTimestampMs();
syncNodeStopHeartbeatTimer(pSyncNode); syncNodeStopHeartbeatTimer(pSyncNode);
// reset elect timer, long enough // reset elect timer, long enough
...@@ -1667,6 +1670,7 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { ...@@ -1667,6 +1670,7 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
// state change // state change
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
pSyncNode->roleTimeMs = taosGetTimestampMs();
syncNodeStopHeartbeatTimer(pSyncNode); syncNodeStopHeartbeatTimer(pSyncNode);
// trace log // trace log
...@@ -1695,6 +1699,7 @@ void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) { ...@@ -1695,6 +1699,7 @@ void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
// state change // state change
pSyncNode->state = TAOS_SYNC_STATE_LEARNER; pSyncNode->state = TAOS_SYNC_STATE_LEARNER;
pSyncNode->roleTimeMs = taosGetTimestampMs();
// trace log // trace log
sNTrace(pSyncNode, "become learner %s", debugStr); sNTrace(pSyncNode, "become learner %s", debugStr);
...@@ -1730,8 +1735,6 @@ void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) { ...@@ -1730,8 +1735,6 @@ void syncNodeBecomeLearner(SSyncNode* pSyncNode, const char* debugStr) {
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>> // /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
// //
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode->leaderTime = taosGetTimestampMs();
pSyncNode->becomeLeaderNum++; pSyncNode->becomeLeaderNum++;
pSyncNode->hbrSlowNum = 0; pSyncNode->hbrSlowNum = 0;
...@@ -1740,6 +1743,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { ...@@ -1740,6 +1743,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
// state change // state change
pSyncNode->state = TAOS_SYNC_STATE_LEADER; pSyncNode->state = TAOS_SYNC_STATE_LEADER;
pSyncNode->roleTimeMs = taosGetTimestampMs();
// set leader cache // set leader cache
pSyncNode->leaderCache = pSyncNode->myRaftId; pSyncNode->leaderCache = pSyncNode->myRaftId;
...@@ -1839,6 +1843,7 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) { ...@@ -1839,6 +1843,7 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode) {
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER); ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE; pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
pSyncNode->roleTimeMs = taosGetTimestampMs();
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64,
pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex); pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册