提交 3e13cd82 编写于 作者: B Benguang Zhao

fix: check if timer triggered ahead of time in syncNodeElect

上级 ab2f4e97
......@@ -696,6 +696,8 @@ int32_t syncNodeOnSnapshotReply(SSyncNode* ths, SyncSnapshotRsp* pMsg);
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg);
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg);
void syncNodePegLastMsgRecvTime(SSyncNode* ths);
// -----------------------------------------
typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg);
typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg);
......
......@@ -281,7 +281,7 @@ typedef enum ELogicConditionType {
#define TSDB_DNODE_ROLE_VNODE 2
#define TSDB_MAX_REPLICA 5
#define TSDB_SYNC_LOG_BUFFER_SIZE 512
#define TSDB_SYNC_LOG_BUFFER_SIZE 1024
#define TSDB_TBNAME_COLUMN_INDEX (-1)
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta
......
......@@ -298,6 +298,7 @@ typedef struct SSyncNode {
int64_t startTime;
int64_t leaderTime;
int64_t lastReplicateTime;
int64_t lastMsgRecvTime;
} SSyncNode;
......
......@@ -148,7 +148,7 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn
*pBarrier = syncLogIsReplicationBarrier(pEntry);
prevLogTerm = syncLogReplMgrGetPrevLogTerm(pMgr, pNode, index);
if (prevLogTerm < 0 && terrno != TSDB_CODE_SUCCESS) {
if (prevLogTerm < 0) {
sError("vgId:%d, failed to get prev log term since %s. index: %" PRId64 "", pNode->vgId, terrstr(), index);
goto _out;
}
......@@ -163,7 +163,7 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn
(void)syncNodeSendAppendEntries(pNode, pDestId, pMsgOut);
ret = 0;
sInfo("vgId:%d, replicate one msg index: %" PRId64 " term: %" PRId64 " prevterm: %" PRId64 " to dest: 0x%016" PRIx64,
sDebug("vgId:%d, replicate one msg index: %" PRId64 " term: %" PRId64 " prevterm: %" PRId64 " to dest: 0x%016" PRIx64,
pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr);
_out:
......
......@@ -34,6 +34,13 @@
int32_t syncNodeElect(SSyncNode* pSyncNode) {
syncNodeEventLog(pSyncNode, "begin election");
int64_t nowMs = taosGetMonoTimestampMs();
if (nowMs < pSyncNode->lastMsgRecvTime + pSyncNode->electTimerMS) {
sError("vgId:%d, election timer triggered ahead of time for %" PRId64 "ms", pSyncNode->vgId,
pSyncNode->lastMsgRecvTime + pSyncNode->electTimerMS - nowMs);
return -1;
}
int32_t ret = 0;
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
syncNodeFollower2Candidate(pSyncNode);
......@@ -105,7 +112,11 @@ int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {
int32_t syncNodeSendRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) {
int32_t ret = 0;
syncLogSendRequestVote(pSyncNode, pMsg, "");
// syncLogSendRequestVote(pSyncNode, pMsg, "");
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sInfo("vgId:%d, send request vote of term: %" PRId64 " to %s:%d", pSyncNode->vgId, pMsg->term, host, port);
SRpcMsg rpcMsg;
syncRequestVote2RpcMsg(pMsg, &rpcMsg);
......
......@@ -1167,15 +1167,18 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
SSyncLogBuffer* pBuf = pNode->pLogBuf;
SRaftId destId = pMsg->srcId;
ASSERT(pMgr->restored == false);
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
if (pMgr->endIndex == 0) {
ASSERT(pMgr->startIndex == 0);
ASSERT(pMgr->matchIndex == 0);
if (pMsg->matchIndex < 0) {
pMgr->restored = true;
sInfo("vgId:%d, sync log repl mgr of the %d'th peer restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64
sInfo("vgId:%d, sync log repl mgr of peer %s:%d restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64
"), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pNode->vgId, host, port, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
return 0;
}
......@@ -1189,9 +1192,9 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
if (pMsg->matchIndex == pMsg->lastSendIndex) {
pMgr->restored = true;
sInfo("vgId:%d, sync log repl mgr of the %d'th peer restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64
sInfo("vgId:%d, sync log repl mgr of peer %s:%d restored. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64
"), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pNode->vgId, host, port, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
return 0;
}
......@@ -1278,7 +1281,7 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode
}
SSyncLogBuffer* pBuf = pNode->pLogBuf;
sInfo("vgId:%d, attempted to probe the %d'th peer with msg of index:%" PRId64 " term: %" PRId64
sDebug("vgId:%d, attempted to probe the %d'th peer with msg of index:%" PRId64 " term: %" PRId64
". pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
")",
pNode->vgId, pMgr->peerId, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
......@@ -1963,6 +1966,8 @@ int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
int32_t ret = 0;
int32_t electMS;
syncNodePegLastMsgRecvTime(pSyncNode);
if (pSyncNode->pRaftCfg->isStandBy) {
electMS = TIMER_MAX_MS;
} else {
......@@ -3231,6 +3236,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
}
syncTimeoutDestroy(pSyncMsg);
#if 0
// reset timer ms
if (syncEnvIsStart() && pSyncNode->electBaseLine > 0) {
pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
......@@ -3239,6 +3245,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
} else {
sError("sync env is stop, syncNodeEqElectTimer");
}
#endif
}
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
......@@ -3246,6 +3253,10 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
syncNodeEventLog(pSyncNode, "eq hb timer");
#if 0
sInfo("vgId:%d, heartbeat timer tick.", pSyncNode->vgId);
#endif
if (pSyncNode->replicaNum > 1) {
if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <=
atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) {
......@@ -3484,6 +3495,11 @@ int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
return ret;
}
void syncNodePegLastMsgRecvTime(SSyncNode* ths) {
int64_t nowMs = taosGetMonoTimestampMs();
ths->lastMsgRecvTime = nowMs;
}
int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
syncLogRecvHeartbeat(ths, pMsg, "");
......@@ -3497,6 +3513,13 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
SRpcMsg rpcMsg;
syncHeartbeatReply2RpcMsg(pMsgReply, &rpcMsg);
#if 0
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sInfo("vgId:%d, recv heartbeat msg from %s:%d", ths->vgId, host, port);
#endif
#if 1
if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
syncNodeStepDown(ths, pMsg->term);
......
......@@ -239,6 +239,13 @@ int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destRaftId, c
int32_t ret = 0;
syncLogSendHeartbeat(pSyncNode, pMsg, "");
#if 0
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sInfo("vgId:%d, send heartbeat msg to %s:%d", pSyncNode->vgId, host, port);
#endif
SRpcMsg rpcMsg;
syncHeartbeat2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(&(pMsg->destId), pSyncNode, &rpcMsg);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册