提交 31b54e74 编写于 作者: M Minghao Li

enh(sync): add privateTerm into SyncSnapshotRsp, SyncSnapshotSend, SSyncSnapshotReceiver

上级 78521883
......@@ -398,6 +398,7 @@ typedef struct SyncSnapshotSend {
SyncTerm term;
SyncIndex lastIndex; // lastIndex of snapshot
SyncTerm lastTerm; // lastTerm of snapshot
SyncTerm privateTerm;
int32_t seq;
uint32_t dataLen;
char data[];
......@@ -432,6 +433,7 @@ typedef struct SyncSnapshotRsp {
SyncTerm term;
SyncIndex lastIndex;
SyncTerm lastTerm;
SyncTerm privateTerm;
int32_t ack;
} SyncSnapshotRsp;
......
......@@ -41,8 +41,8 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr);
void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr);
void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index);
SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr);
char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr);
cJSON * syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr);
char * syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr);
void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term);
SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
......
......@@ -39,8 +39,8 @@ typedef struct SSyncSnapshotSender {
bool start;
int32_t seq;
int32_t ack;
void *pReader;
void *pCurrentBlock;
void * pReader;
void * pCurrentBlock;
int32_t blockLen;
SSnapshot snapshot;
int64_t sendingMS;
......@@ -55,19 +55,20 @@ void snapshotSenderDoStart(SSyncSnapshotSender *pSender);
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex);
void snapshotSenderDestroy(SSyncSnapshotSender *pSender);
void snapshotSenderStart(SSyncSnapshotSender *pSender);
void snapshotSenderStop(SSyncSnapshotSender *pSender);
int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender);
char *snapshotSender2Str(SSyncSnapshotSender *pSender);
// void snapshotSenderStart(SSyncSnapshotSender *pSender);
void snapshotSenderStop(SSyncSnapshotSender *pSender);
int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender);
char * snapshotSender2Str(SSyncSnapshotSender *pSender);
typedef struct SSyncSnapshotReceiver {
bool start;
int32_t ack;
void *pWriter;
void * pWriter;
SyncTerm term;
SyncTerm privateTerm;
SSyncNode *pSyncNode;
int32_t replicaIndex;
......@@ -77,8 +78,8 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t repl
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
cJSON * snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg);
int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg);
......
......@@ -576,6 +576,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
pReply->term = ths->pRaftStore->currentTerm;
pReply->success = false;
pReply->matchIndex = SYNC_INDEX_INVALID;
pReply->privateTerm = pMsg->privateTerm;
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
......@@ -660,6 +661,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->success = true;
pReply->privateTerm = pMsg->privateTerm;
if (hasAppendEntries) {
pReply->matchIndex = pMsg->prevLogIndex + 1;
......
......@@ -175,6 +175,8 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
ASSERT(pSender != NULL);
SyncIndex sentryIndex;
#if 0
SyncIndex sentryIndex;
if (pSender->start && pSender->term == ths->pRaftStore->currentTerm) {
// already start
sentryIndex = pSender->snapshot.lastApplyIndex;
......@@ -182,14 +184,26 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
ths->pRaftStore->currentTerm);
} else {
// start send snapshot, first time
sTrace("sending snapshot start first: pSender->term:%lu, ths->pRaftStore->currentTerm:%lu", pSender->term,
ths->pRaftStore->currentTerm);
if (pMsg->privateTerm == pSender->privateTerm) {
sTrace("same privateTerm, pMsg->privateTerm:%lu, pSender->privateTerm:%lu, do not start snapshot again",
pMsg->privateTerm, pSender->privateTerm);
} else {
// start send snapshot, first time
sTrace(
"sending snapshot start first: pSender->term:%lu, ths->pRaftStore->currentTerm:%lu, "
"pMsg->privateTerm:%lu, pSender->privateTerm:%lu",
pSender->term, ths->pRaftStore->currentTerm, pMsg->privateTerm, pSender->privateTerm);
snapshotSenderDoStart(pSender);
pSender->start = true;
// update snapshot private term
syncIndexMgrSetTerm(ths->pNextIndex, &(pMsg->srcId), pSender->privateTerm);
}
snapshotSenderDoStart(pSender);
pSender->start = true;
sentryIndex = pSender->snapshot.lastApplyIndex;
}
#endif
// update nextIndex to sentryIndex + 1
if (nextIndex <= sentryIndex) {
......
......@@ -119,7 +119,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) {
cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......
......@@ -1815,6 +1815,9 @@ cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg) {
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->privateTerm);
cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->lastIndex);
cJSON_AddStringToObject(pRoot, "lastIndex", u64buf);
......@@ -1978,6 +1981,9 @@ cJSON* syncSnapshotRsp2Json(const SyncSnapshotRsp* pMsg) {
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->privateTerm);
cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->lastIndex);
cJSON_AddStringToObject(pRoot, "lastIndex", u64buf);
......
......@@ -177,6 +177,7 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
pMsg->prevLogIndex = preLogIndex;
pMsg->prevLogTerm = preLogTerm;
pMsg->commitIndex = pSyncNode->commitIndex;
pMsg->privateTerm = syncIndexMgrGetTerm(pSyncNode->pNextIndex, pDestId);
// send msg
syncNodeAppendEntries(pSyncNode, pDestId, pMsg);
......
......@@ -14,6 +14,7 @@
*/
#include "syncSnapshot.h"
#include "syncIndexMgr.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
......@@ -42,6 +43,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->pSyncNode = pSyncNode;
pSender->replicaIndex = replicaIndex;
pSender->term = pSyncNode->pRaftStore->currentTerm;
pSender->privateTerm = 0;
pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot));
pSender->finish = false;
......@@ -90,9 +92,13 @@ void snapshotSenderDoStart(SSyncSnapshotSender *pSender) {
sTrace("snapshot send begin seq:%d ack:%d send msg:%s", pSender->seq, pSender->ack, msgStr);
taosMemoryFree(msgStr);
// when start, increase term
++(pSender->privateTerm);
syncSnapshotSendDestroy(pMsg);
}
#if 0
// when entry in snapshot, start sender
void snapshotSenderStart(SSyncSnapshotSender *pSender) {
if (!(pSender->start)) {
......@@ -142,6 +148,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
sInfo("snapshotSenderStart %s", s);
taosMemoryFree(s);
}
#endif
void snapshotSenderStop(SSyncSnapshotSender *pSender) {
if (pSender->pReader != NULL) {
......@@ -392,6 +399,9 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
cJSON_AddNumberToObject(pRoot, "replicaIndex", pReceiver->replicaIndex);
snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->privateTerm);
cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
}
cJSON *pJson = cJSON_CreateObject();
......@@ -514,6 +524,10 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
pSender->finish = true;
snapshotSenderStop(pSender);
// update nextIndex private term
syncIndexMgrSetTerm(pSyncNode->pNextIndex, &(pMsg->srcId), pSender->privateTerm);
return 0;
}
......
......@@ -25,7 +25,6 @@ SRaftId ids[TSDB_MAX_REPLICA];
SSyncNode* pSyncNode;
SSyncNode* syncNodeInit() {
pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
memset(pSyncNode, 0, sizeof(SSyncNode));
pSyncNode->replicaNum = replicaNum;
......@@ -81,7 +80,7 @@ int main(int argc, char** argv) {
printf("---------------------------------------\n");
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
SyncIndex idx = syncIndexMgrGetIndex(pSyncIndexMgr, &ids[i]);
SyncTerm term = syncIndexMgrGetTerm(pSyncIndexMgr, &ids[i]);
SyncTerm term = syncIndexMgrGetTerm(pSyncIndexMgr, &ids[i]);
printf("%d: index:%ld term:%lu \n", i, idx, term);
}
printf("---------------------------------------\n");
......
......@@ -46,6 +46,7 @@ SSyncSnapshotReceiver* createReceiver() {
pReceiver->ack = 20;
pReceiver->pWriter = (void*)0x11;
pReceiver->term = 66;
pReceiver->privateTerm = 99;
return pReceiver;
}
......
......@@ -21,6 +21,7 @@ SyncSnapshotRsp *createMsg() {
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
pMsg->destId.vgId = 100;
pMsg->term = 11;
pMsg->privateTerm = 99;
pMsg->lastIndex = 22;
pMsg->lastTerm = 33;
pMsg->ack = 44;
......
......@@ -21,6 +21,7 @@ SyncSnapshotSend *createMsg() {
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
pMsg->destId.vgId = 100;
pMsg->term = 11;
pMsg->privateTerm = 99;
pMsg->lastIndex = 22;
pMsg->lastTerm = 33;
pMsg->seq = 44;
......
......@@ -308,7 +308,7 @@ int main(int argc, char** argv) {
// check parameter
assert(replicaNum >= 1 && replicaNum <= 5);
//assert(myIndex >= 0 && myIndex < replicaNum);
// assert(myIndex >= 0 && myIndex < replicaNum);
assert(lastApplyIndex >= -1);
assert(lastApplyTerm >= 0);
assert(writeRecordNum >= 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册