提交 d04c1271 编写于 作者: M Minghao Li

fix: send snapshot

上级 48a331fd
......@@ -39,14 +39,15 @@ typedef struct SSyncSnapshotSender {
bool start;
int32_t seq;
int32_t ack;
void * pReader;
void * pCurrentBlock;
void *pReader;
void *pCurrentBlock;
int32_t blockLen;
SSnapshot snapshot;
int64_t sendingMS;
SSyncNode *pSyncNode;
int32_t replicaIndex;
SyncTerm term;
bool finish;
} SSyncSnapshotSender;
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex);
......@@ -55,14 +56,14 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender);
void snapshotSenderStop(SSyncSnapshotSender *pSender);
int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender);
char * snapshotSender2Str(SSyncSnapshotSender *pSender);
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender);
char *snapshotSender2Str(SSyncSnapshotSender *pSender);
typedef struct SSyncSnapshotReceiver {
bool start;
int32_t ack;
void * pWriter;
void *pWriter;
SyncTerm term;
SSyncNode *pSyncNode;
......@@ -73,8 +74,8 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t repl
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
cJSON * snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg);
int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg);
......
......@@ -471,12 +471,18 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
ASSERT(pMsg->prevLogIndex == snapshot.lastApplyIndex);
logOK = (pMsg->prevLogIndex == snapshot.lastApplyIndex) && (pMsg->prevLogTerm == snapshot.lastApplyTerm);
sTrace(
"1 - logOK:%d, pMsg->prevLogIndex:%ld, snapshot.lastApplyIndex:%ld, pMsg->prevLogTerm:%lu, "
"snapshot.lastApplyTerm:%lu",
logOK, pMsg->prevLogIndex, snapshot.lastApplyIndex, pMsg->prevLogTerm, snapshot.lastApplyTerm);
} else {
logOK = (pMsg->prevLogIndex == SYNC_INDEX_INVALID) ||
((pMsg->prevLogIndex >= SYNC_INDEX_BEGIN) &&
(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) &&
(pMsg->prevLogTerm == localPreLogTerm));
sTrace("2 - logOK:%d, pMsg->prevLogIndex:%ld, getLastIndex:%ld, pMsg->prevLogTerm:%lu, localPreLogTerm:%lu", logOK,
pMsg->prevLogIndex, ths->pLogStore->getLastIndex(ths->pLogStore), pMsg->prevLogTerm, localPreLogTerm);
}
// reject request
......
......@@ -75,7 +75,8 @@ int32_t syncIOSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
syncUtilMsgNtoH(pMsg->pCont);
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), "==syncIOSendMsg== %s:%d msgType:%d", pEpSet->eps[0].fqdn, pEpSet->eps[0].port, pMsg->msgType);
snprintf(logBuf, sizeof(logBuf), "==syncIOSendMsg== %s:%d msgType:%d", pEpSet->eps[0].fqdn, pEpSet->eps[0].port,
pMsg->msgType);
syncRpcMsgLog2(logBuf, pMsg);
syncUtilMsgHtoN(pMsg->pCont);
......
......@@ -134,7 +134,22 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
// batch optimized
// SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex);
if (syncNodeIsIndexInSnapshot(pSyncNode, nextIndex)) {
// sending snapshot finish?
bool snapshotSendingFinish = false;
SSyncSnapshotSender* pSender = NULL;
for (int i = 0; i < pSyncNode->replicaNum; ++i) {
if (syncUtilSameId(pDestId, &((pSyncNode->replicasId)[i]))) {
pSender = (pSyncNode->senders)[i];
}
}
ASSERT(pSender != NULL);
snapshotSendingFinish = (pSender->finish) && (pSender->term == pSyncNode->pRaftStore->currentTerm);
if (snapshotSendingFinish) {
sInfo("snapshotSendingFinish! term:%lu", pSender->term);
}
if ((syncNodeIsIndexInSnapshot(pSyncNode, nextIndex - 1) && !snapshotSendingFinish) ||
syncNodeIsIndexInSnapshot(pSyncNode, nextIndex)) {
// will send this msg until snapshot receive finish!
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
......@@ -142,14 +157,10 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
snapshot.lastApplyIndex, snapshot.lastApplyTerm);
// do not use next index
// always send from new last index + 1
SyncIndex lastIndex;
SyncTerm lastTerm;
ret = syncNodeGetLastIndexTerm(pSyncNode, &lastIndex, &lastTerm);
ASSERT(ret == 0);
// always send from snapshot.lastApplyIndex + 1, and wait for snapshot transfer finish
ret = syncNodeGetPreIndexTerm(pSyncNode, lastIndex + 1, &preLogIndex, &preLogTerm);
ASSERT(ret == 0);
preLogIndex = snapshot.lastApplyIndex;
preLogTerm = snapshot.lastApplyTerm;
// to claim leader
SyncAppendEntries* pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId);
......
......@@ -40,6 +40,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->pSyncNode = pSyncNode;
pSender->replicaIndex = replicaIndex;
pSender->term = pSyncNode->pRaftStore->currentTerm;
pSender->finish = false;
} else {
sInfo("snapshotSenderCreate cannot create sender");
}
......@@ -270,6 +271,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
cJSON_AddNumberToObject(pRoot, "replicaIndex", pSender->replicaIndex);
snprintf(u64buf, sizeof(u64buf), "%lu", pSender->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
cJSON_AddNumberToObject(pRoot, "finish", pSender->finish);
}
cJSON *pJson = cJSON_CreateObject();
......@@ -435,7 +437,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
pReceiver->pWriter = NULL;
snapshotReceiverStop(pReceiver);
pReceiver->ack = pMsg->seq;
needRsp = false;
needRsp = true;
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace("snapshot recv end ack:%d recv msg:%s", pReceiver->ack, msgStr);
......@@ -506,6 +508,7 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
// receiver ack is finish, close sender
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
pSender->finish = true;
snapshotSenderStop(pSender);
return 0;
}
......
......@@ -129,7 +129,7 @@ int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) {
sTrace("%s", logBuf);
if (isApply) {
gSnapshotLastApplyIndex = 7;
gSnapshotLastApplyIndex = 10;
gSnapshotLastApplyTerm = 1;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册