提交 a5d3b703 编写于 作者: M Minghao Li

refactor(sync): add snapshot writer param

上级 856806bb
...@@ -96,6 +96,11 @@ typedef struct SReConfigCbMeta { ...@@ -96,6 +96,11 @@ typedef struct SReConfigCbMeta {
} SReConfigCbMeta; } SReConfigCbMeta;
typedef struct SSnapshotParam {
SyncIndex start;
SyncIndex end;
} SSnapshotParam;
typedef struct SSnapshot { typedef struct SSnapshot {
void* data; void* data;
SyncIndex lastApplyIndex; SyncIndex lastApplyIndex;
...@@ -125,7 +130,7 @@ typedef struct SSyncFSM { ...@@ -125,7 +130,7 @@ typedef struct SSyncFSM {
int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader); int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader);
int32_t (*FpSnapshotDoRead)(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len); int32_t (*FpSnapshotDoRead)(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len);
int32_t (*FpSnapshotStartWrite)(struct SSyncFSM* pFsm, void** ppWriter); int32_t (*FpSnapshotStartWrite)(struct SSyncFSM* pFsm, void* pWriterParam, void** ppWriter);
int32_t (*FpSnapshotStopWrite)(struct SSyncFSM* pFsm, void* pWriter, bool isApply); int32_t (*FpSnapshotStopWrite)(struct SSyncFSM* pFsm, void* pWriter, bool isApply);
int32_t (*FpSnapshotDoWrite)(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len); int32_t (*FpSnapshotDoWrite)(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len);
......
...@@ -483,9 +483,10 @@ typedef struct SyncSnapshotSend { ...@@ -483,9 +483,10 @@ typedef struct SyncSnapshotSend {
SRaftId destId; SRaftId destId;
SyncTerm term; SyncTerm term;
SyncIndex lastIndex; // lastIndex of snapshot SyncIndex beginIndex; // snapshot.beginIndex
SyncTerm lastTerm; // lastTerm of snapshot SyncIndex lastIndex; // snapshot.lastIndex
SyncIndex lastConfigIndex; SyncTerm lastTerm; // snapshot.lastTerm
SyncIndex lastConfigIndex; // snapshot.lastConfigIndex
SSyncCfg lastConfig; SSyncCfg lastConfig;
SyncTerm privateTerm; SyncTerm privateTerm;
int32_t seq; int32_t seq;
......
...@@ -134,7 +134,7 @@ int32_t mndSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, in ...@@ -134,7 +134,7 @@ int32_t mndSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, in
return sdbDoRead(pMnode->pSdb, pReader, ppBuf, len); return sdbDoRead(pMnode->pSdb, pReader, ppBuf, len);
} }
int32_t mndSnapshotStartWrite(struct SSyncFSM *pFsm, void **ppWriter) { int32_t mndSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) {
mInfo("start to apply snapshot to sdb"); mInfo("start to apply snapshot to sdb");
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter); return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter);
......
...@@ -415,7 +415,7 @@ static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { ret ...@@ -415,7 +415,7 @@ static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { ret
static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { return 0; } static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { return 0; }
static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void **ppWriter) { return 0; } static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) { return 0; }
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { return 0; } static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { return 0; }
......
...@@ -44,11 +44,6 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p ...@@ -44,11 +44,6 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
typedef struct SReaderParam {
SyncIndex start;
SyncIndex end;
} SReaderParam;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -37,44 +37,47 @@ extern "C" { ...@@ -37,44 +37,47 @@ extern "C" {
//--------------------------------------------------- //---------------------------------------------------
typedef struct SSyncSnapshotSender { typedef struct SSyncSnapshotSender {
bool start; bool start;
int32_t seq; int32_t seq;
int32_t ack; int32_t ack;
void * pReader; void *pReader;
void * pCurrentBlock; void *pCurrentBlock;
int32_t blockLen; int32_t blockLen;
SSnapshot snapshot; SSnapshotParam snapshotParam;
SSyncCfg lastConfig; SSnapshot snapshot;
int64_t sendingMS; SSyncCfg lastConfig;
SSyncNode *pSyncNode; int64_t sendingMS;
int32_t replicaIndex; SSyncNode *pSyncNode;
SyncTerm term; int32_t replicaIndex;
SyncTerm privateTerm; SyncTerm term;
bool finish; SyncTerm privateTerm;
bool finish;
} SSyncSnapshotSender; } SSyncSnapshotSender;
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex); SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex);
void snapshotSenderDestroy(SSyncSnapshotSender *pSender); void snapshotSenderDestroy(SSyncSnapshotSender *pSender);
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender); bool snapshotSenderIsStart(SSyncSnapshotSender *pSender);
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader); int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapshotParam, SSnapshot snapshot,
void *pReader);
int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish); int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish);
int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender);
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender);
char * snapshotSender2Str(SSyncSnapshotSender *pSender); char *snapshotSender2Str(SSyncSnapshotSender *pSender);
char * snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event);
//--------------------------------------------------- //---------------------------------------------------
typedef struct SSyncSnapshotReceiver { typedef struct SSyncSnapshotReceiver {
bool start; bool start;
int32_t ack; int32_t ack;
void * pWriter; void *pWriter;
SyncTerm term; SyncTerm term;
SyncTerm privateTerm; SyncTerm privateTerm;
SSnapshot snapshot; SSnapshotParam snapshotParam;
SRaftId fromId; SSnapshot snapshot;
SSyncNode *pSyncNode; SRaftId fromId;
SSyncNode *pSyncNode;
} SSyncSnapshotReceiver; } SSyncSnapshotReceiver;
...@@ -85,8 +88,8 @@ int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); ...@@ -85,8 +88,8 @@ int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event);
//--------------------------------------------------- //---------------------------------------------------
// on message // on message
......
...@@ -118,12 +118,12 @@ static void syncNodeStartSnapshot(SSyncNode* ths, SyncIndex beginIndex, SyncInde ...@@ -118,12 +118,12 @@ static void syncNodeStartSnapshot(SSyncNode* ths, SyncIndex beginIndex, SyncInde
SSnapshot snapshot = { SSnapshot snapshot = {
.data = NULL, .lastApplyIndex = endIndex, .lastApplyTerm = lastApplyTerm, .lastConfigIndex = SYNC_INDEX_INVALID}; .data = NULL, .lastApplyIndex = endIndex, .lastApplyTerm = lastApplyTerm, .lastConfigIndex = SYNC_INDEX_INVALID};
void* pReader = NULL; void* pReader = NULL;
SReaderParam readerParam = {.start = beginIndex, .end = endIndex}; SSnapshotParam readerParam = {.start = beginIndex, .end = endIndex};
ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader); ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader);
if (!snapshotSenderIsStart(pSender) && pMsg->privateTerm < pSender->privateTerm) { if (!snapshotSenderIsStart(pSender) && pMsg->privateTerm < pSender->privateTerm) {
ASSERT(pReader != NULL); ASSERT(pReader != NULL);
snapshotSenderStart(pSender, snapshot, pReader); snapshotSenderStart(pSender, readerParam, snapshot, pReader);
} else { } else {
if (pReader != NULL) { if (pReader != NULL) {
...@@ -300,7 +300,8 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ...@@ -300,7 +300,8 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
!snapshotSenderIsStart(pSender) && pMsg->privateTerm < pSender->privateTerm) { !snapshotSenderIsStart(pSender) && pMsg->privateTerm < pSender->privateTerm) {
// has snapshot // has snapshot
ASSERT(pReader != NULL); ASSERT(pReader != NULL);
snapshotSenderStart(pSender, snapshot, pReader); SSnapshotParam readerParam = {.start = 0, .end = snapshot.lastApplyIndex};
snapshotSenderStart(pSender, readerParam, snapshot, pReader);
} else { } else {
// no snapshot // no snapshot
......
...@@ -2254,6 +2254,9 @@ cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg) { ...@@ -2254,6 +2254,9 @@ cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg) {
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->privateTerm); snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->privateTerm);
cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->beginIndex);
cJSON_AddStringToObject(pRoot, "beginIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->lastIndex); snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->lastIndex);
cJSON_AddStringToObject(pRoot, "lastIndex", u64buf); cJSON_AddStringToObject(pRoot, "lastIndex", u64buf);
......
...@@ -80,13 +80,15 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { ...@@ -80,13 +80,15 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; } bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }
// begin send snapshot by snapshot, pReader // begin send snapshot by snapshot, pReader
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader) { int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapshotParam, SSnapshot snapshot,
void *pReader) {
ASSERT(!snapshotSenderIsStart(pSender)); ASSERT(!snapshotSenderIsStart(pSender));
// init snapshot and reader // init snapshot, parm, reader
ASSERT(pSender->pReader == NULL); ASSERT(pSender->pReader == NULL);
pSender->pReader = pReader; pSender->pReader = pReader;
pSender->snapshot = snapshot; pSender->snapshot = snapshot;
pSender->snapshotParam = snapshotParam;
// init current block // init current block
if (pSender->pCurrentBlock != NULL) { if (pSender->pCurrentBlock != NULL) {
...@@ -162,6 +164,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, vo ...@@ -162,6 +164,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, vo
pMsg->srcId = pSender->pSyncNode->myRaftId; pMsg->srcId = pSender->pSyncNode->myRaftId;
pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex]; pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm; pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
pMsg->beginIndex = pSender->snapshotParam.start;
pMsg->lastIndex = pSender->snapshot.lastApplyIndex; pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
pMsg->lastTerm = pSender->snapshot.lastApplyTerm; pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex; pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
...@@ -353,14 +356,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { ...@@ -353,14 +356,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char *snapshotSender2Str(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender); cJSON *pJson = snapshotSender2Json(pSender);
char * serialized = cJSON_Print(pJson); char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
int32_t len = 256; int32_t len = 256;
char * s = taosMemoryMalloc(len); char *s = taosMemoryMalloc(len);
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
char host[64]; char host[64];
...@@ -439,10 +442,13 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p ...@@ -439,10 +442,13 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p
pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex; pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm; pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex; pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
pReceiver->snapshotParam.end = pBeginMsg->lastIndex;
// write data // write data
ASSERT(pReceiver->pWriter == NULL); ASSERT(pReceiver->pWriter == NULL);
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter)); int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm,
&(pReceiver->snapshotParam), &(pReceiver->pWriter));
ASSERT(ret == 0); ASSERT(ret == 0);
// event log // event log
...@@ -612,7 +618,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { ...@@ -612,7 +618,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
cJSON_AddStringToObject(pFromId, "addr", u64buf); cJSON_AddStringToObject(pFromId, "addr", u64buf);
{ {
uint64_t u64 = pReceiver->fromId.addr; uint64_t u64 = pReceiver->fromId.addr;
cJSON * pTmp = pFromId; cJSON *pTmp = pFromId;
char host[128] = {0}; char host[128] = {0};
uint16_t port; uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port); syncUtilU642Addr(u64, host, sizeof(host), &port);
...@@ -645,14 +651,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { ...@@ -645,14 +651,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver); cJSON *pJson = snapshotReceiver2Json(pReceiver);
char * serialized = cJSON_Print(pJson); char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) { char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) {
int32_t len = 256; int32_t len = 256;
char * s = taosMemoryMalloc(len); char *s = taosMemoryMalloc(len);
SRaftId fromId = pReceiver->fromId; SRaftId fromId = pReceiver->fromId;
char host[128]; char host[128];
......
...@@ -114,7 +114,7 @@ int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32 ...@@ -114,7 +114,7 @@ int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32
return 0; return 0;
} }
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void** ppWriter) { int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void *pParam, void** ppWriter) {
*ppWriter = (void*)0xCDEF; *ppWriter = (void*)0xCDEF;
char logBuf[256] = {0}; char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartWrite== pFsm:%p, *ppWriter:%p", pFsm, *ppWriter); snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartWrite== pFsm:%p, *ppWriter:%p", pFsm, *ppWriter);
......
...@@ -29,7 +29,7 @@ int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void** ppReader) { return 0; } ...@@ -29,7 +29,7 @@ int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void** ppReader) { return 0; }
int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; } int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; }
int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; } int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; }
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void** ppWriter) { return 0; } int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void *pParam, void** ppWriter) { return 0; }
int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) { return 0; } int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) { return 0; }
int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len) { return 0; } int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len) { return 0; }
......
...@@ -111,7 +111,7 @@ int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32 ...@@ -111,7 +111,7 @@ int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32
return 0; return 0;
} }
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void** ppWriter) { int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void *pParam, void** ppWriter) {
*ppWriter = (void*)0xCDEF; *ppWriter = (void*)0xCDEF;
char logBuf[256] = {0}; char logBuf[256] = {0};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册