提交 326714a0 编写于 作者: M Minghao Li

fix(sync): snapshot maybe change when sending

上级 44a893a1
...@@ -39,8 +39,8 @@ typedef struct SSyncSnapshotSender { ...@@ -39,8 +39,8 @@ typedef struct SSyncSnapshotSender {
bool start; bool start;
int32_t seq; int32_t seq;
int32_t ack; int32_t ack;
void * pReader; void *pReader;
void * pCurrentBlock; void *pCurrentBlock;
int32_t blockLen; int32_t blockLen;
SSnapshot snapshot; SSnapshot snapshot;
int64_t sendingMS; int64_t sendingMS;
...@@ -50,20 +50,22 @@ typedef struct SSyncSnapshotSender { ...@@ -50,20 +50,22 @@ typedef struct SSyncSnapshotSender {
bool finish; bool finish;
} SSyncSnapshotSender; } SSyncSnapshotSender;
void snapshotSenderDoStart(SSyncSnapshotSender *pSender);
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex); SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex);
void snapshotSenderDestroy(SSyncSnapshotSender *pSender); void snapshotSenderDestroy(SSyncSnapshotSender *pSender);
void snapshotSenderStart(SSyncSnapshotSender *pSender); void snapshotSenderStart(SSyncSnapshotSender *pSender);
void snapshotSenderStop(SSyncSnapshotSender *pSender); void snapshotSenderStop(SSyncSnapshotSender *pSender);
int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender);
cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender); cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender);
char * snapshotSender2Str(SSyncSnapshotSender *pSender); char *snapshotSender2Str(SSyncSnapshotSender *pSender);
typedef struct SSyncSnapshotReceiver { typedef struct SSyncSnapshotReceiver {
bool start; bool start;
int32_t ack; int32_t ack;
void * pWriter; void *pWriter;
SyncTerm term; SyncTerm term;
SSyncNode *pSyncNode; SSyncNode *pSyncNode;
...@@ -74,8 +76,8 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t repl ...@@ -74,8 +76,8 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t repl
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
cJSON * snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg); int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg);
int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg); int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg);
......
...@@ -174,18 +174,15 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ...@@ -174,18 +174,15 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
} }
ASSERT(pSender != NULL); ASSERT(pSender != NULL);
// calculate sentryIndex
SyncIndex sentryIndex; SyncIndex sentryIndex;
if (pSender->start) { if (pSender->start && pSender->term == ths->pRaftStore->currentTerm) {
// already start
sentryIndex = pSender->snapshot.lastApplyIndex; sentryIndex = pSender->snapshot.lastApplyIndex;
} else { } else {
// start send snapshot // start send snapshot, first time
if (!(pSender->term == ths->pRaftStore->currentTerm && pSender->finish == true)) { snapshotSenderDoStart(pSender);
snapshotSenderStart(pSender); pSender->start = true;
} else {
sInfo("snapshot send finish, send_term:%lu, current_term:%lu", pSender->term, ths->pRaftStore->currentTerm);
}
sentryIndex = pSender->snapshot.lastApplyIndex; sentryIndex = pSender->snapshot.lastApplyIndex;
} }
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#include "syncUtil.h" #include "syncUtil.h"
#include "wal.h" #include "wal.h"
static void snapshotSenderDoStart(SSyncSnapshotSender *pSender); // static void snapshotSenderDoStart(SSyncSnapshotSender *pSender);
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver); static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver);
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
...@@ -59,7 +59,7 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { ...@@ -59,7 +59,7 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
} }
// begin send snapshot (current term, seq begin) // begin send snapshot (current term, seq begin)
static void snapshotSenderDoStart(SSyncSnapshotSender *pSender) { void snapshotSenderDoStart(SSyncSnapshotSender *pSender) {
pSender->term = pSender->pSyncNode->pRaftStore->currentTerm; pSender->term = pSender->pSyncNode->pRaftStore->currentTerm;
pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册