未验证 提交 e3068a60 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #13180 from taosdata/feature/3.0_mhli

enh(sync): add SSnapshot callback: reader, writer
......@@ -66,12 +66,6 @@ typedef struct SSyncCfg {
SNodeInfo nodeInfo[TSDB_MAX_REPLICA];
} SSyncCfg;
typedef struct SSnapshot {
void* data;
SyncIndex lastApplyIndex;
SyncTerm lastApplyTerm;
} SSnapshot;
typedef struct SFsmCbMeta {
SyncIndex index;
bool isWeak;
......@@ -93,6 +87,12 @@ typedef struct SReConfigCbMeta {
uint64_t flag;
} SReConfigCbMeta;
typedef struct SSnapshot {
void *data;
SyncIndex lastApplyIndex;
SyncTerm lastApplyTerm;
} SSnapshot;
typedef struct SSyncFSM {
void* data;
......@@ -101,23 +101,17 @@ typedef struct SSyncFSM {
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
// if (*ppIter == NULL)
// *ppIter = new iter;
// else
// *ppIter.next();
//
// if success, return 0. else return error code
int32_t (*FpSnapshotRead)(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, void** ppIter, char** ppBuf,
int32_t* len);
void (*FpReConfigCb)(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta);
// apply data into fsm
int32_t (*FpSnapshotApply)(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, char* pBuf, int32_t len);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
void (*FpReConfigCb)(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta);
int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void** ppReader);
int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader);
int32_t (*FpSnapshotDoRead)(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len);
// int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot);
int32_t (*FpSnapshotStartWrite)(struct SSyncFSM* pFsm, void** ppWriter);
int32_t (*FpSnapshotStopWrite)(struct SSyncFSM* pFsm, void* pWriter, bool isApply);
int32_t (*FpSnapshotDoWrite)(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len);
} SSyncFSM;
......
......@@ -117,6 +117,8 @@ void mndReConfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta)
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
memset(pFsm, 0, sizeof(*pFsm));
pFsm->data = pMnode;
pFsm->FpCommitCb = mndSyncCommitMsg;
......@@ -125,8 +127,6 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
pFsm->FpGetSnapshot = mndSyncGetSnapshot;
pFsm->FpRestoreFinishCb = mndRestoreFinish;
pFsm->FpSnapshotRead = mndSnapshotRead;
pFsm->FpSnapshotApply = mndSnapshotApply;
pFsm->FpReConfigCb = mndReConfig;
return pFsm;
......
......@@ -142,14 +142,13 @@ void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta
SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
memset(pFsm, 0, sizeof(*pFsm));
pFsm->data = pVnode;
pFsm->FpCommitCb = vnodeSyncCommitMsg;
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
pFsm->FpRestoreFinishCb = NULL;
pFsm->FpSnapshotRead = NULL;
pFsm->FpSnapshotApply = NULL;
pFsm->FpReConfigCb = NULL;
return pFsm;
......
......@@ -28,10 +28,12 @@ extern "C" {
#include "taosdef.h"
typedef struct SSyncSnapshotSender {
bool isStart;
int32_t progressIndex;
int32_t sending;
int32_t received;
bool finish;
void * pCurrentBlock;
int32_t len;
int32_t blockLen;
int64_t sendingMS;
SSnapshot *pSnapshot;
SSyncNode *pSyncNode;
} SSyncSnapshotSender;
......@@ -43,7 +45,8 @@ cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender);
char * snapshotSender2Str(SSyncSnapshotSender *pSender);
typedef struct SSyncSnapshotReceiver {
bool isStart;
bool start;
int32_t received;
int32_t progressIndex;
void * pCurrentBlock;
int32_t len;
......
......@@ -58,8 +58,8 @@ int32_t raftCfgPersist(SRaftCfg *pRaftCfg) {
int64_t ret = taosWriteFile(pRaftCfg->pFile, buf, sizeof(buf));
assert(ret == sizeof(buf));
//int64_t ret = taosWriteFile(pRaftCfg->pFile, s, strlen(s) + 1);
//assert(ret == strlen(s) + 1);
// int64_t ret = taosWriteFile(pRaftCfg->pFile, s, strlen(s) + 1);
// assert(ret == strlen(s) + 1);
taosMemoryFree(s);
taosFsyncFile(pRaftCfg->pFile);
......@@ -170,7 +170,7 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, int8_t isStandBy, const char *path) {
SRaftCfg raftCfg;
raftCfg.cfg = *pCfg;
raftCfg.isStandBy = isStandBy;
char * s = raftCfg2Str(&raftCfg);
char *s = raftCfg2Str(&raftCfg);
char buf[CONFIG_FILE_LEN] = {0};
memset(buf, 0, sizeof(buf));
......@@ -179,8 +179,8 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, int8_t isStandBy, const char *path) {
int64_t ret = taosWriteFile(pFile, buf, sizeof(buf));
assert(ret == sizeof(buf));
//int64_t ret = taosWriteFile(pFile, s, strlen(s) + 1);
//assert(ret == strlen(s) + 1);
// int64_t ret = taosWriteFile(pFile, s, strlen(s) + 1);
// assert(ret == strlen(s) + 1);
taosMemoryFree(s);
taosCloseFile(&pFile);
......
......@@ -84,14 +84,15 @@ void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta)
SSyncFSM* createFsm() {
SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
memset(pFsm, 0, sizeof(*pFsm));
pFsm->FpCommitCb = CommitCb;
pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb;
pFsm->FpGetSnapshot = GetSnapshotCb;
pFsm->FpRestoreFinishCb = RestoreFinishCb;
pFsm->FpSnapshotApply = NULL;
pFsm->FpSnapshotRead = NULL;
pFsm->FpReConfigCb = ReConfigCb;
......
......@@ -75,6 +75,7 @@ int32_t GetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
void initFsm() {
pFsm = (SSyncFSM *)taosMemoryMalloc(sizeof(SSyncFSM));
memset(pFsm, 0, sizeof(*pFsm));
pFsm->FpCommitCb = CommitCb;
pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册