提交 59448cd1 编写于 作者: M Minghao Li

refactor(sync): pre snapshot on message

上级 21d60cb5
...@@ -550,7 +550,7 @@ typedef struct SyncPreSnapshotReply { ...@@ -550,7 +550,7 @@ typedef struct SyncPreSnapshotReply {
// private data // private data
SyncTerm term; SyncTerm term;
SyncIndex matchIndex; SyncIndex snapStart;
} SyncPreSnapshotReply; } SyncPreSnapshotReply;
......
...@@ -3702,8 +3702,9 @@ void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshot ...@@ -3702,8 +3702,9 @@ void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshot
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
char logBuf[256]; char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "send sync-pre-snapshot-reply to %s:%d {term:%" PRIu64 ", match:%" PRId64 "}, %s", snprintf(logBuf, sizeof(logBuf),
host, port, pMsg->term, pMsg->matchIndex, s); "send sync-pre-snapshot-reply to %s:%d {term:%" PRIu64 ", snap-start:%" PRId64 "}, %s", host, port,
pMsg->term, pMsg->snapStart, s);
syncNodeEventLog(pSyncNode, logBuf); syncNodeEventLog(pSyncNode, logBuf);
} }
...@@ -3712,7 +3713,8 @@ void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshot ...@@ -3712,7 +3713,8 @@ void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshot
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256]; char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-pre-snapshot-reply from %s:%d {term:%" PRIu64 ", match:%" PRId64 "}, %s", snprintf(logBuf, sizeof(logBuf),
host, port, pMsg->term, pMsg->matchIndex, s); "recv sync-pre-snapshot-reply from %s:%d {term:%" PRIu64 ", snap-start:%" PRId64 "}, %s", host, port,
pMsg->term, pMsg->snapStart, s);
syncNodeEventLog(pSyncNode, logBuf); syncNodeEventLog(pSyncNode, logBuf);
} }
...@@ -2568,8 +2568,8 @@ cJSON* syncPreSnapshotReply2Json(const SyncPreSnapshotReply* pMsg) { ...@@ -2568,8 +2568,8 @@ cJSON* syncPreSnapshotReply2Json(const SyncPreSnapshotReply* pMsg) {
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
cJSON_AddStringToObject(pRoot, "term", u64buf); cJSON_AddStringToObject(pRoot, "term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->matchIndex); snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->snapStart);
cJSON_AddStringToObject(pRoot, "match-index", u64buf); cJSON_AddStringToObject(pRoot, "snap-start", u64buf);
} }
cJSON* pJson = cJSON_CreateObject(); cJSON* pJson = cJSON_CreateObject();
......
...@@ -925,10 +925,60 @@ int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { ...@@ -925,10 +925,60 @@ int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
int32_t syncNodeOnPreSnapshot(SSyncNode *ths, SyncPreSnapshot *pMsg) { int32_t syncNodeOnPreSnapshot(SSyncNode *ths, SyncPreSnapshot *pMsg) {
syncLogRecvSyncPreSnapshot(ths, pMsg, ""); syncLogRecvSyncPreSnapshot(ths, pMsg, "");
SyncPreSnapshotReply *pMsgReply = syncPreSnapshotReplyBuild(ths->vgId);
pMsgReply->srcId = ths->myRaftId;
pMsgReply->destId = pMsg->srcId;
pMsgReply->term = ths->pRaftStore->currentTerm;
SSyncLogStoreData *pData = ths->pLogStore->data;
SWal *pWal = pData->pWal;
if (syncNodeIsMnode(ths)) {
pMsgReply->snapStart = SYNC_INDEX_BEGIN;
} else {
bool isEmpty = ths->pLogStore->syncLogIsEmpty(ths->pLogStore);
int64_t walCommitVer = walGetCommittedVer(pWal);
if (!isEmpty && ths->commitIndex != walCommitVer) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "commit not same, wal-commit:%" PRId64 ", commit:%" PRId64 ", ignore",
walCommitVer, ths->commitIndex);
syncNodeErrorLog(ths, logBuf);
goto _IGNORE;
}
pMsgReply->snapStart = ths->commitIndex + 1;
// make local log clean
int32_t code = ths->pLogStore->syncLogTruncate(ths->pLogStore, pMsgReply->snapStart);
if (code != 0) {
syncNodeErrorLog(ths, "truncate wal error");
goto _IGNORE;
}
}
// can not write behind _RESPONSE
SRpcMsg rpcMsg;
_RESPONSE:
syncPreSnapshotReply2RpcMsg(pMsgReply, &rpcMsg);
syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg);
syncPreSnapshotReplyDestroy(pMsgReply);
return 0;
_IGNORE:
syncPreSnapshotReplyDestroy(pMsgReply);
return 0; return 0;
} }
int32_t syncNodeOnPreSnapshotReply(SSyncNode *ths, SyncPreSnapshotReply *pMsg) { int32_t syncNodeOnPreSnapshotReply(SSyncNode *ths, SyncPreSnapshotReply *pMsg) {
syncLogRecvSyncPreSnapshotReply(ths, pMsg, ""); syncLogRecvSyncPreSnapshotReply(ths, pMsg, "");
// start snapshot
return 0; return 0;
} }
\ No newline at end of file
...@@ -22,7 +22,7 @@ SyncPreSnapshotReply *createMsg() { ...@@ -22,7 +22,7 @@ SyncPreSnapshotReply *createMsg() {
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
pMsg->destId.vgId = 100; pMsg->destId.vgId = 100;
pMsg->term = 9527; pMsg->term = 9527;
pMsg->matchIndex = 12306; pMsg->snapStart = 12306;
return pMsg; return pMsg;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册