提交 74b48e6d 编写于 作者: M Minghao Li

refacotr(sync): modify sync-snapshot

上级 3e6d6fe8
......@@ -35,8 +35,10 @@ extern bool gRaftDetailLog;
#define SYNC_MAX_PROGRESS_WAIT_MS 4000
#define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20)
#define SYNC_MAX_RECV_TIME_RANGE_MS 1200
#define SYNC_DEL_WAL_MS (1000 * 60)
#define SYNC_ADD_QUORUM_COUNT 3
#define SYNC_MNODE_MAX_LOG_NUM 10000
#define SYNC_MNODE_LOG_RETENTION 10000
#define SYNC_VNODE_LOG_RETENTION 500
#define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000
......
......@@ -459,6 +459,8 @@ typedef struct SyncHeartbeat {
SyncTerm term;
SyncIndex commitIndex;
SyncTerm privateTerm;
SyncTerm minMatchIndex;
} SyncHeartbeat;
SyncHeartbeat* syncHeartbeatBuild(int32_t vgId);
......
......@@ -197,6 +197,8 @@ typedef struct SSyncNode {
bool changing;
int64_t snapshottingIndex;
int64_t snapshottingTime;
int64_t minMatchIndex;
int64_t startTime;
int64_t leaderTime;
......@@ -236,15 +238,16 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode);
// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
cJSON* syncNode2Json(const SSyncNode* pSyncNode);
char* syncNode2Str(const SSyncNode* pSyncNode);
void syncNodeEventLog(const SSyncNode* pSyncNode, char* str);
void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str);
char* syncNode2SimpleStr(const SSyncNode* pSyncNode);
bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
cJSON* syncNode2Json(const SSyncNode* pSyncNode);
char* syncNode2Str(const SSyncNode* pSyncNode);
void syncNodeEventLog(const SSyncNode* pSyncNode, char* str);
void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str);
char* syncNode2SimpleStr(const SSyncNode* pSyncNode);
bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode);
SSyncNode* syncNodeAcquire(int64_t rid);
void syncNodeRelease(SSyncNode* pNode);
......
......@@ -288,6 +288,22 @@ int32_t syncLeaderTransferTo(int64_t rid, SNodeInfo newLeader) {
return ret;
}
SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) {
SyncIndex minMatchIndex = SYNC_INDEX_INVALID;
if (pSyncNode->peersNum > 0) {
minMatchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[0]));
}
for (int32_t i = 1; i < pSyncNode->peersNum; ++i) {
SyncIndex matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
if (matchIndex < minMatchIndex) {
minMatchIndex = matchIndex;
}
}
return minMatchIndex;
}
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
......@@ -298,58 +314,118 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
int32_t code = 0;
if (syncNodeIsMnode(pSyncNode)) {
// mnode
int64_t logRetention = SYNC_MNODE_LOG_RETENTION;
SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
SyncIndex endIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
int64_t logNum = endIndex - beginIndex;
if (logNum < SYNC_MNODE_MAX_LOG_NUM) {
bool isEmpty = pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore);
if (isEmpty || (!isEmpty && logNum < logRetention)) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "mnode log num:%ld, do not delete", logNum);
snprintf(logBuf, sizeof(logBuf), "new-snapshot-index:%ld, log-num:%ld, empty:%d, do not delete wal",
lastApplyIndex, logNum, isEmpty);
syncNodeEventLog(pSyncNode, logBuf);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return 0;
}
}
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
if (lastApplyIndex > matchIndex) {
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pSyncNode->peersId[i].addr, host, sizeof(host), &port);
goto _DEL_WAL;
} else {
// vnode
if (pSyncNode->replicaNum > 1) {
// multi replicas
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
pSyncNode->minMatchIndex = syncMinMatchIndex(pSyncNode);
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
int64_t matchIndex = syncIndexMgrGetIndex(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
if (lastApplyIndex > matchIndex) {
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pSyncNode->peersId[i].addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"new-snapshot-index:%ld is greater than match-index:%ld of %s:%d, do not delete wal",
lastApplyIndex, matchIndex, host, port);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return 0;
}
}
} else if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
if (lastApplyIndex > pSyncNode->minMatchIndex) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"new-snapshot-index:%ld is greater than min-match-index:%ld, do not delete wal", lastApplyIndex,
pSyncNode->minMatchIndex);
syncNodeEventLog(pSyncNode, logBuf);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return 0;
}
} else if (pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"new-snapshot-index:%ld is greater than match-index:%ld of %s:%d, do not delete wal", lastApplyIndex,
matchIndex, host, port);
snprintf(logBuf, sizeof(logBuf), "new-snapshot-index:%ld candidate, do not delete wal", lastApplyIndex,
pSyncNode->minMatchIndex);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return 0;
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return 0;
} else {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "new-snapshot-index:%ld unknown state, do not delete wal", lastApplyIndex,
pSyncNode->minMatchIndex);
syncNodeEventLog(pSyncNode, logBuf);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return 0;
}
goto _DEL_WAL;
} else {
// one replica
goto _DEL_WAL;
}
}
if (pSyncNode->replicaNum == 1) {
SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
code = walBeginSnapshot(pData->pWal, lastApplyIndex);
} else {
// calculate snapshot index
_DEL_WAL:
do {
SyncIndex snapshottingIndex = atomic_load_64(&pSyncNode->snapshottingIndex);
if (snapshottingIndex == SYNC_INDEX_INVALID) {
atomic_store_64(&pSyncNode->snapshottingIndex, lastApplyIndex);
pSyncNode->snapshottingTime = taosGetTimestampMs();
do {
SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
code = walBeginSnapshot(pData->pWal, lastApplyIndex);
if (code == 0) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "wal snapshot begin, index:%ld, last apply index:%ld",
pSyncNode->snapshottingIndex, lastApplyIndex);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
code = walBeginSnapshot(pData->pWal, lastApplyIndex);
} else {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "wal snapshot begin error since:%s, index:%ld, last apply index:%ld",
terrstr(terrno), pSyncNode->snapshottingIndex, lastApplyIndex);
syncNodeErrorLog(pSyncNode, logBuf);
atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
}
} else {
char logBuf[256];
......@@ -357,7 +433,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
snapshottingIndex, lastApplyIndex);
syncNodeEventLog(pSyncNode, logBuf);
}
}
} while (0);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return code;
......@@ -373,16 +449,22 @@ int32_t syncEndSnapshot(int64_t rid) {
int32_t code = 0;
if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) {
do {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "wal snapshot end, index:%ld", atomic_load_64(&pSyncNode->snapshottingIndex));
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
SSyncLogStoreData* pData = pSyncNode->pLogStore->data;
code = walEndSnapshot(pData->pWal);
if (code != 0) {
sError("vgId:%d, wal end snapshot error since:%s", terrstr(terrno));
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return -1;
} else {
do {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "wal snapshot end, index:%ld", atomic_load_64(&pSyncNode->snapshottingIndex));
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
}
}
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
......@@ -1208,6 +1290,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// peer state
syncNodePeerStateInit(pSyncNode);
// min match index
pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
// start in syncNodeStart
// start raft
// syncNodeBecomeFollower(pSyncNode);
......@@ -1745,18 +1830,18 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
char logBuf[256 + 256];
if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
snprintf(logBuf, sizeof(logBuf),
"vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", snap:%" PRId64
", snap-tm:%" PRIu64
"vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64
", snap:%" PRId64 ", snap-tm:%" PRIu64
", sby:%d, "
"stgy:%d, bch:%d, "
"r-num:%d, "
"lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize,
pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing,
pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode), pSyncNode->electTimerLogicClockUser,
pSyncNode->heartbeatTimerLogicClockUser, printStr);
pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex,
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, printStr);
} else {
snprintf(logBuf, sizeof(logBuf), "%s", str);
}
......@@ -1769,18 +1854,18 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
char* s = (char*)taosMemoryMalloc(len);
if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
snprintf(s, len,
"vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", snap:%" PRId64
", snap-tm:%" PRIu64
"vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64
", snap:%" PRId64 ", snap-tm:%" PRIu64
", sby:%d, "
"stgy:%d, bch:%d, "
"r-num:%d, "
"lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize,
pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing,
pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode), pSyncNode->electTimerLogicClockUser,
pSyncNode->heartbeatTimerLogicClockUser, printStr);
pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex,
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, printStr);
} else {
snprintf(s, len, "%s", str);
}
......@@ -1818,17 +1903,18 @@ inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
char logBuf[256 + 256];
if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
snprintf(logBuf, sizeof(logBuf),
"vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", snap:%" PRId64
", snap-tm:%" PRIu64
"vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64
", snap:%" PRId64 ", snap-tm:%" PRIu64
", sby:%d, "
"stgy:%d, bch:%d, "
"r-num:%d, "
"lcfg:%" PRId64 ", chging:%d, rsto:%d, %s",
"lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize,
pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing,
pSyncNode->restoreFinish, printStr);
pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex,
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, printStr);
} else {
snprintf(logBuf, sizeof(logBuf), "%s", str);
}
......@@ -1839,17 +1925,18 @@ inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
char* s = (char*)taosMemoryMalloc(len);
if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) {
snprintf(s, len,
"vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", snap:%" PRId64
", snap-tm:%" PRIu64
"vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64
", snap:%" PRId64 ", snap-tm:%" PRIu64
", sby:%d, "
"stgy:%d, bch:%d, "
"r-num:%d, "
"lcfg:%" PRId64 ", chging:%d, rsto:%d, %s",
"lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize,
pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing,
pSyncNode->restoreFinish, printStr);
pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex,
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
pSyncNode->changing, pSyncNode->restoreFinish, syncNodeDynamicQuorum(pSyncNode),
pSyncNode->electTimerLogicClockUser, pSyncNode->heartbeatTimerLogicClockUser, printStr);
} else {
snprintf(s, len, "%s", str);
}
......@@ -2197,6 +2284,9 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode->pFsm->FpBecomeFollowerCb(pSyncNode->pFsm);
}
// min match index
pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
// trace log
do {
int32_t debugStrLen = strlen(debugStr);
......@@ -2296,6 +2386,9 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode->pFsm->FpBecomeLeaderCb(pSyncNode->pFsm);
}
// min match index
pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
// trace log
do {
int32_t debugStrLen = strlen(debugStr);
......@@ -2671,6 +2764,10 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
SSyncNode* pSyncNode = pData->pSyncNode;
SSyncTimer* pSyncTimer = pData->pTimer;
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
return;
}
syncNodeEventLog(pSyncNode, "eq peer hb timer");
int64_t timerLogicClock = atomic_load_64(&pSyncTimer->logicClock);
......@@ -2683,6 +2780,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
pSyncMsg->destId = pData->destId;
pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
pSyncMsg->commitIndex = pSyncNode->commitIndex;
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
pSyncMsg->privateTerm = 0;
SRpcMsg rpcMsg;
......@@ -2845,6 +2943,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_LEADER) {
syncNodeResetElectTimer(ths);
ths->minMatchIndex = pMsg->minMatchIndex;
#if 0
if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
......@@ -3353,7 +3452,7 @@ const char* syncTimerTypeStr(enum ESyncTimeoutType timerType) {
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-timer {type:%s, lc:%lu, ms:%d, data:%p}, %s",
syncTimerTypeStr(pMsg->timeoutType), s, pMsg->logicClock, pMsg->timerMS, pMsg->data);
syncTimerTypeStr(pMsg->timeoutType), pMsg->logicClock, pMsg->timerMS, pMsg->data, s);
syncNodeEventLog(pSyncNode, logBuf);
}
......@@ -3485,8 +3584,9 @@ void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-heartbeat from %s:%d {term:%" PRIu64 ", cmt:%" PRIu64 ", pterm:%" PRIu64 "}, %s", host, port,
pMsg->term, pMsg->commitIndex, pMsg->privateTerm, s);
"send sync-heartbeat from %s:%d {term:%" PRIu64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRIu64
"}, %s",
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
syncNodeEventLog(pSyncNode, logBuf);
}
......@@ -3496,8 +3596,9 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-heartbeat from %s:%d {term:%" PRIu64 ", cmt:%" PRIu64 ", pterm:%" PRIu64 "}, %s", host, port,
pMsg->term, pMsg->commitIndex, pMsg->privateTerm, s);
"recv sync-heartbeat from %s:%d {term:%" PRIu64 ", cmt:%" PRId64 ", min-match:%" PRId64 ", pterm:%" PRIu64
"}, %s",
host, port, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->privateTerm, s);
syncNodeEventLog(pSyncNode, logBuf);
}
......
......@@ -216,6 +216,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
pSyncMsg->destId = pSyncNode->peersId[i];
pSyncMsg->term = pSyncNode->pRaftStore->currentTerm;
pSyncMsg->commitIndex = pSyncNode->commitIndex;
pSyncMsg->minMatchIndex = syncMinMatchIndex(pSyncNode);
pSyncMsg->privateTerm = 0;
SRpcMsg rpcMsg;
......
......@@ -16,6 +16,7 @@
#include "syncTimeout.h"
#include "syncElection.h"
#include "syncRaftCfg.h"
#include "syncRaftLog.h"
#include "syncReplication.h"
#include "syncRespMgr.h"
......@@ -63,10 +64,31 @@ int32_t syncNodeTimerRoutine(SSyncNode* ths) {
// timer replicate
syncNodeReplicate(ths);
// clean mnode index
if (syncNodeIsMnode(ths)) {
syncNodeCleanConfigIndex(ths);
}
// end timeout wal snapshot
int64_t timeNow = taosGetTimestampMs();
if (timeNow - ths->snapshottingIndex > SYNC_DEL_WAL_MS &&
atomic_load_64(&ths->snapshottingIndex) != SYNC_INDEX_INVALID) {
SSyncLogStoreData* pData = ths->pLogStore->data;
int32_t code = walEndSnapshot(pData->pWal);
if (code != 0) {
sError("vgId:%d, wal end snapshot error since:%s", terrstr(terrno));
return -1;
} else {
do {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "wal snapshot end, index:%ld", atomic_load_64(&ths->snapshottingIndex));
syncNodeEventLog(ths, logBuf);
} while (0);
atomic_store_64(&ths->snapshottingIndex, SYNC_INDEX_INVALID);
}
}
#if 0
if (!syncNodeIsMnode(ths)) {
syncRespClean(ths->pSyncRespMgr);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册