提交 437b6843 编写于 作者: S Shengliang Guan

Merge branch '3.0' of https://github.com/taosdata/TDengine into 3.0

...@@ -232,7 +232,7 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, S ...@@ -232,7 +232,7 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, S
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode); SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode);
int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h); int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h);
bool syncNodeHeartbeatTimeout(SSyncNode* pSyncNode); bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode);
// raft state change -------------- // raft state change --------------
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term); void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
......
...@@ -124,6 +124,7 @@ typedef struct SyncHeartbeat { ...@@ -124,6 +124,7 @@ typedef struct SyncHeartbeat {
SyncIndex commitIndex; SyncIndex commitIndex;
SyncTerm privateTerm; SyncTerm privateTerm;
SyncTerm minMatchIndex; SyncTerm minMatchIndex;
int64_t timeStamp;
} SyncHeartbeat; } SyncHeartbeat;
typedef struct SyncHeartbeatReply { typedef struct SyncHeartbeatReply {
......
...@@ -640,7 +640,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { ...@@ -640,7 +640,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
} }
// heartbeat timeout // heartbeat timeout
if (syncNodeHeartbeatTimeout(pSyncNode)) { if (syncNodeHeartbeatReplyTimeout(pSyncNode)) {
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY; terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
sNError(pSyncNode, "failed to sync propose since hearbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64, sNError(pSyncNode, "failed to sync propose since hearbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex); TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
...@@ -2039,6 +2039,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { ...@@ -2039,6 +2039,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
pSyncMsg->commitIndex = pSyncNode->commitIndex; pSyncMsg->commitIndex = pSyncNode->commitIndex;
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode); pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
pSyncMsg->privateTerm = 0; pSyncMsg->privateTerm = 0;
pSyncMsg->timeStamp = taosGetTimestampMs();
// send msg // send msg
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
...@@ -2094,7 +2095,7 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand ...@@ -2094,7 +2095,7 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand
return code; return code;
} }
bool syncNodeHeartbeatTimeout(SSyncNode* pSyncNode) { bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
if (pSyncNode->replicaNum == 1) { if (pSyncNode->replicaNum == 1) {
return false; return false;
} }
...@@ -2148,7 +2149,11 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { ...@@ -2148,7 +2149,11 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncHeartbeat* pMsg = pRpcMsg->pCont; SyncHeartbeat* pMsg = pRpcMsg->pCont;
syncLogRecvHeartbeat(ths, pMsg, "");
int64_t tsMs = taosGetTimestampMs();
char buf[128];
snprintf(buf, sizeof(buf), "recv local time:%" PRId64, tsMs);
syncLogRecvHeartbeat(ths, pMsg, buf);
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
(void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId); (void)syncBuildHeartbeatReply(&rpcMsg, ths->vgId);
...@@ -2161,6 +2166,8 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2161,6 +2166,8 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
pMsgReply->timeStamp = taosGetTimestampMs(); pMsgReply->timeStamp = taosGetTimestampMs();
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) { if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
syncIndexMgrSetRecvTime(ths->pNextIndex, &(pMsg->srcId), tsMs);
syncNodeResetElectTimer(ths); syncNodeResetElectTimer(ths);
ths->minMatchIndex = pMsg->minMatchIndex; ths->minMatchIndex = pMsg->minMatchIndex;
...@@ -2220,9 +2227,11 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2220,9 +2227,11 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncHeartbeatReply* pMsg = pRpcMsg->pCont; SyncHeartbeatReply* pMsg = pRpcMsg->pCont;
syncLogRecvHeartbeatReply(ths, pMsg, "");
int64_t tsMs = taosGetTimestampMs(); int64_t tsMs = taosGetTimestampMs();
char buf[128];
snprintf(buf, sizeof(buf), "recv local time:%" PRId64, tsMs);
syncLogRecvHeartbeatReply(ths, pMsg, buf);
// update last reply time, make decision whether the other node is alive or not // update last reply time, make decision whether the other node is alive or not
syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs); syncIndexMgrSetRecvTime(ths->pMatchIndex, &pMsg->srcId, tsMs);
......
...@@ -213,9 +213,11 @@ int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcM ...@@ -213,9 +213,11 @@ int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destId, SRpcM
} }
int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) { int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
int64_t ts = taosGetTimestampMs();
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
if (syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId) != 0) { if (syncBuildHeartbeat(&rpcMsg, pSyncNode->vgId) != 0) {
sError("vgId:%d, build sync-heartbeat error", pSyncNode->vgId);
continue; continue;
} }
...@@ -226,6 +228,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) { ...@@ -226,6 +228,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
pSyncMsg->commitIndex = pSyncNode->commitIndex; pSyncMsg->commitIndex = pSyncNode->commitIndex;
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode); pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
pSyncMsg->privateTerm = 0; pSyncMsg->privateTerm = 0;
pSyncMsg->timeStamp = ts;
// send msg // send msg
syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg);
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "syncUtil.h" #include "syncUtil.h"
#include "syncIndexMgr.h"
#include "syncMessage.h" #include "syncMessage.h"
#include "syncRaftCfg.h" #include "syncRaftCfg.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
...@@ -175,6 +176,36 @@ void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) { ...@@ -175,6 +176,36 @@ void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) {
} }
} }
// for leader
static void syncHearbeatReplyTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
int32_t len = 5;
for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->replicasId[i]));
if (i < pSyncNode->replicaNum - 1) {
len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 ",", i, tsMs);
} else {
len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 "}", i, tsMs);
}
}
}
// for follower
static void syncHearbeatTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
int32_t len = 4;
for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->replicasId[i]));
if (i < pSyncNode->replicaNum - 1) {
len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 ",", i, tsMs);
} else {
len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 "}", i, tsMs);
}
}
}
static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
int32_t len = 1; int32_t len = 1;
...@@ -221,6 +252,12 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo ...@@ -221,6 +252,12 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
char peerStr[1024] = "{"; char peerStr[1024] = "{";
syncPeerState2Str(pNode, peerStr, sizeof(peerStr)); syncPeerState2Str(pNode, peerStr, sizeof(peerStr));
char hbrTimeStr[256] = "hbr:{";
syncHearbeatReplyTime2Str(pNode, hbrTimeStr, sizeof(hbrTimeStr));
char hbTimeStr[256] = "hb:{";
syncHearbeatTime2Str(pNode, hbTimeStr, sizeof(hbTimeStr));
int32_t quorum = syncNodeDynamicQuorum(pNode); int32_t quorum = syncNodeDynamicQuorum(pNode);
char eventLog[512]; // {0}; char eventLog[512]; // {0};
...@@ -243,12 +280,13 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo ...@@ -243,12 +280,13 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
"%s" "%s"
", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
", snap-tm:%" PRIu64 ", sby:%d, aq:%d, snaping:%" PRId64 ", r-num:%d, lcfg:%" PRId64 ", snap-tm:%" PRIu64 ", sby:%d, aq:%d, snaping:%" PRId64 ", r-num:%d, lcfg:%" PRId64
", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s", ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s, %s, %s",
pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex, pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex,
logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pNode->pRaftCfg->isStandBy, aqItems, pNode->snapshottingIndex, pNode->replicaNum, pNode->pRaftCfg->isStandBy, aqItems, pNode->snapshottingIndex, pNode->replicaNum,
pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum, pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum,
pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr, hbTimeStr,
hbrTimeStr);
} }
} }
...@@ -395,9 +433,8 @@ void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const ...@@ -395,9 +433,8 @@ void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, sNTrace(pSyncNode,
"send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64 "send sync-heartbeat to %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64 "}, %s",
"}, %s", host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s);
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
} }
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
...@@ -406,9 +443,9 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const ...@@ -406,9 +443,9 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, sNTrace(pSyncNode,
"recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRId64 "recv sync-heartbeat from %s:%d {term:%" PRId64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", ts:%" PRId64
"}, %s", "}, %s",
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s); host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s);
} }
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) { void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
...@@ -416,8 +453,8 @@ void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p ...@@ -416,8 +453,8 @@ void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-heartbeat-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 "}, %s", host, port, sNTrace(pSyncNode, "send sync-heartbeat-reply from %s:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s", host, port,
pMsg->term, pMsg->privateTerm, s); pMsg->term, pMsg->timeStamp, s);
} }
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) { void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册