提交 f7191d2f 编写于 作者: M Minghao Li

refactor(sync): add pre snapshot

上级 59448cd1
......@@ -22,9 +22,8 @@ extern "C" {
#include "sync.h"
#include "syncTools.h"
#include "tlog.h"
#include "ttimer.h"
#include "taosdef.h"
#include "tlog.h"
#include "ttimer.h"
// clang-format off
......@@ -344,6 +343,12 @@ void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMs
void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s);
void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s);
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s);
void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s);
void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s);
void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s);
// for debug --------------
void syncNodePrint(SSyncNode* pObj);
void syncNodePrint2(char* s, SSyncNode* pObj);
......
......@@ -28,10 +28,11 @@ extern "C" {
#include "syncMessage.h"
#include "taosdef.h"
#define SYNC_SNAPSHOT_SEQ_INVALID -1
#define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -2
#define SYNC_SNAPSHOT_SEQ_BEGIN 0
#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF
#define SYNC_SNAPSHOT_SEQ_INVALID -2
#define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -3
#define SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT -1
#define SYNC_SNAPSHOT_SEQ_BEGIN 0
#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF
#define SYNC_SNAPSHOT_RETRY_MS 5000
......@@ -47,19 +48,19 @@ typedef struct SSyncSnapshotSender {
SSnapshot snapshot;
SSyncCfg lastConfig;
int64_t sendingMS;
SSyncNode *pSyncNode;
int32_t replicaIndex;
SyncTerm term;
SyncTerm privateTerm;
int64_t startTime;
bool finish;
// init when create
SSyncNode *pSyncNode;
int32_t replicaIndex;
} SSyncSnapshotSender;
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex);
void snapshotSenderDestroy(SSyncSnapshotSender *pSender);
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender);
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapshotParam, SSnapshot snapshot,
void *pReader);
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender);
int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish);
int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
......@@ -76,11 +77,13 @@ typedef struct SSyncSnapshotReceiver {
int32_t ack;
void *pWriter;
SyncTerm term;
SyncTerm privateTerm;
SSnapshotParam snapshotParam;
SSnapshot snapshot;
SRaftId fromId;
SSyncNode *pSyncNode;
int64_t startTime;
// init when create
SSyncNode *pSyncNode;
} SSyncSnapshotReceiver;
......
......@@ -142,7 +142,6 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
// pReply->matchIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
pReply->matchIndex = SYNC_INDEX_INVALID;
pReply->lastSendIndex = pMsg->prevLogIndex + 1;
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
pReply->startTime = ths->startTime;
if (pMsg->term < ths->pRaftStore->currentTerm) {
......
......@@ -73,6 +73,7 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync
int32_t code = ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader);
ASSERT(code == 0);
#if 0
if (pMsg->privateTerm < pSender->privateTerm) {
ASSERT(pReader != NULL);
snapshotSenderStart(pSender, readerParam, snapshot, pReader);
......@@ -82,6 +83,7 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync
ths->pFsm->FpSnapshotStopRead(ths->pFsm, pReader);
}
}
#endif
}
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
......
......@@ -2355,6 +2355,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
// init peer mgr
syncNodePeerStateInit(pSyncNode);
#if 0
// update sender private term
SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
if (pMySender != NULL) {
......@@ -2365,6 +2366,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
}
(pMySender->privateTerm) += 100;
}
#endif
// close receiver
if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
......@@ -3718,3 +3720,11 @@ void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshot
pMsg->term, pMsg->snapStart, s);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {}
void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {}
void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {}
void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {}
......@@ -2873,8 +2873,8 @@ cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg) {
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm);
cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->startTime);
cJSON_AddStringToObject(pRoot, "startTime", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->beginIndex);
cJSON_AddStringToObject(pRoot, "beginIndex", u64buf);
......@@ -3048,8 +3048,8 @@ cJSON* syncSnapshotRsp2Json(const SyncSnapshotRsp* pMsg) {
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->privateTerm);
cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->startTime);
cJSON_AddStringToObject(pRoot, "startTime", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->lastIndex);
cJSON_AddStringToObject(pRoot, "lastIndex", u64buf);
......@@ -3059,6 +3059,9 @@ cJSON* syncSnapshotRsp2Json(const SyncSnapshotRsp* pMsg) {
cJSON_AddNumberToObject(pRoot, "ack", pMsg->ack);
cJSON_AddNumberToObject(pRoot, "code", pMsg->code);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->snapBeginIndex);
cJSON_AddStringToObject(pRoot, "snap-begin", u64buf);
}
cJSON* pJson = cJSON_CreateObject();
......
......@@ -62,8 +62,8 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) {
syncNodeEventLog(pSyncNode, logBuf);
// start snapshot
int32_t code = syncNodeStartSnapshot(pSyncNode, pDestId);
ASSERT(code == 0);
// int32_t code = syncNodeStartSnapshot(pSyncNode, pDestId);
return 0;
}
......
此差异已折叠。
......@@ -50,7 +50,6 @@ SSyncSnapshotReceiver* createReceiver() {
pReceiver->ack = 20;
pReceiver->pWriter = (void*)0x11;
pReceiver->term = 66;
pReceiver->privateTerm = 99;
return pReceiver;
}
......
......@@ -21,7 +21,7 @@ SyncSnapshotRsp *createMsg() {
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
pMsg->destId.vgId = 100;
pMsg->term = 11;
pMsg->privateTerm = 99;
pMsg->startTime = 99;
pMsg->lastIndex = 22;
pMsg->lastTerm = 33;
pMsg->ack = 44;
......
......@@ -21,7 +21,6 @@ SyncSnapshotSend *createMsg() {
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
pMsg->destId.vgId = 100;
pMsg->term = 11;
pMsg->privateTerm = 99;
pMsg->lastIndex = 22;
pMsg->lastTerm = 33;
......
......@@ -55,7 +55,8 @@ SSyncSnapshotSender* createSender() {
pSender->snapshot.lastApplyTerm = 88;
pSender->sendingMS = 77;
pSender->term = 66;
pSender->privateTerm = 99;
//pSender->privateTerm = 99;
return pSender;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册