提交 c29401ed 编写于 作者: M Minghao Li

refactor(sync): re send snapshot in timer-routine

上级 b9447601
...@@ -47,6 +47,7 @@ extern "C" { ...@@ -47,6 +47,7 @@ extern "C" {
#define SYNC_HEARTBEAT_SLOW_MS 1500 #define SYNC_HEARTBEAT_SLOW_MS 1500
#define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500 #define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500
#define SYNC_SNAP_RESEND_MS 1000 * 60
#define SYNC_MAX_BATCH_SIZE 1 #define SYNC_MAX_BATCH_SIZE 1
#define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_BEGIN 0
......
...@@ -44,6 +44,7 @@ typedef struct SSyncSnapshotSender { ...@@ -44,6 +44,7 @@ typedef struct SSyncSnapshotSender {
SyncTerm term; SyncTerm term;
int64_t startTime; int64_t startTime;
int64_t endTime; int64_t endTime;
int64_t lastSendTime;
bool finish; bool finish;
// init when create // init when create
......
...@@ -103,6 +103,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { ...@@ -103,6 +103,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
pSender->sendingMS = 0; pSender->sendingMS = 0;
pSender->term = pSender->pSyncNode->pRaftStore->currentTerm; pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
pSender->startTime = taosGetTimestampMs(); pSender->startTime = taosGetTimestampMs();
pSender->lastSendTime = pSender->startTime;
pSender->finish = false; pSender->finish = false;
// build begin msg // build begin msg
...@@ -201,6 +202,8 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { ...@@ -201,6 +202,8 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg); syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, ""); syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "");
pSender->lastSendTime = taosGetTimestampMs();
// event log // event log
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) { if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
sSTrace(pSender, "snapshot sender finish"); sSTrace(pSender, "snapshot sender finish");
...@@ -213,7 +216,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { ...@@ -213,7 +216,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
// send snapshot data from cache // send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) { int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
// send current block data // send current block data
if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) {
// build msg // build msg
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
(void)syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId); (void)syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId);
...@@ -229,16 +232,19 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { ...@@ -229,16 +232,19 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
pMsg->lastConfig = pSender->lastConfig; pMsg->lastConfig = pSender->lastConfig;
pMsg->seq = pSender->seq; pMsg->seq = pSender->seq;
if (pSender->pCurrentBlock != NULL && pSender->blockLen > 0) {
// pMsg->privateTerm = pSender->privateTerm; // pMsg->privateTerm = pSender->privateTerm;
memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen); memcpy(pMsg->data, pSender->pCurrentBlock, pSender->blockLen);
}
// send msg // send msg
syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg); syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, ""); syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "");
pSender->lastSendTime = taosGetTimestampMs();
// event log // event log
sSTrace(pSender, "snapshot sender resend"); sSTrace(pSender, "snapshot sender resend");
}
return 0; return 0;
} }
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "syncRaftLog.h" #include "syncRaftLog.h"
#include "syncReplication.h" #include "syncReplication.h"
#include "syncRespMgr.h" #include "syncRespMgr.h"
#include "syncSnapshot.h"
#include "syncUtil.h" #include "syncUtil.h"
static void syncNodeCleanConfigIndex(SSyncNode* ths) { static void syncNodeCleanConfigIndex(SSyncNode* ths) {
...@@ -70,6 +71,20 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) { ...@@ -70,6 +71,20 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
} }
int64_t timeNow = taosGetTimestampMs(); int64_t timeNow = taosGetTimestampMs();
for (int i = 0; i < ths->peersNum; ++i) {
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(ths->peersId[i]));
if (pSender != NULL) {
if (ths->isStart && ths->state == TAOS_SYNC_STATE_LEADER && pSender->start &&
timeNow - pSender->lastSendTime > SYNC_SNAP_RESEND_MS) {
snapshotReSend(pSender);
} else {
sTrace("vgId:%d, do not resend: nstart%d, now:%" PRId64 ", lstsend:%" PRId64 ", diff:%" PRId64, ths->vgId,
ths->isStart, timeNow, pSender->lastSendTime, timeNow - pSender->lastSendTime);
}
}
}
if (atomic_load_64(&ths->snapshottingIndex) != SYNC_INDEX_INVALID) { if (atomic_load_64(&ths->snapshottingIndex) != SYNC_INDEX_INVALID) {
// end timeout wal snapshot // end timeout wal snapshot
if (timeNow - ths->snapshottingTime > SYNC_DEL_WAL_MS && if (timeNow - ths->snapshottingTime > SYNC_DEL_WAL_MS &&
......
...@@ -568,7 +568,7 @@ void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* p ...@@ -568,7 +568,7 @@ void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* p
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, sNTrace(pSyncNode,
"send sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64 ", lterm:%" PRId64 "send sync-snapshot-send to %s:%d {term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", seq:%d}, %s", ", stime:%" PRId64 ", seq:%d}, %s",
host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, s); host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, s);
} }
...@@ -595,7 +595,7 @@ void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMs ...@@ -595,7 +595,7 @@ void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMs
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, sNTrace(pSyncNode,
"send sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64 "send sync-snapshot-rsp to %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", ack:%d}, %s", ", stime:%" PRId64 ", ack:%d}, %s",
host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s); host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册