提交 bdc9acbf 编写于 作者: M Minghao Li

enh(sync): add snapshotEnable

上级 cd2e8fe7
...@@ -88,7 +88,7 @@ typedef struct SReConfigCbMeta { ...@@ -88,7 +88,7 @@ typedef struct SReConfigCbMeta {
} SReConfigCbMeta; } SReConfigCbMeta;
typedef struct SSnapshot { typedef struct SSnapshot {
void *data; void* data;
SyncIndex lastApplyIndex; SyncIndex lastApplyIndex;
SyncTerm lastApplyTerm; SyncTerm lastApplyTerm;
} SSnapshot; } SSnapshot;
...@@ -145,6 +145,7 @@ typedef struct SSyncLogStore { ...@@ -145,6 +145,7 @@ typedef struct SSyncLogStore {
typedef struct SSyncInfo { typedef struct SSyncInfo {
bool isStandBy; bool isStandBy;
bool snapshotEnable;
SyncGroupId vgId; SyncGroupId vgId;
SSyncCfg syncCfg; SSyncCfg syncCfg;
char path[TSDB_FILENAME_LEN]; char path[TSDB_FILENAME_LEN];
......
...@@ -146,6 +146,7 @@ int32_t mndInitSync(SMnode *pMnode) { ...@@ -146,6 +146,7 @@ int32_t mndInitSync(SMnode *pMnode) {
syncInfo.pWal = pMgmt->pWal; syncInfo.pWal = pMgmt->pWal;
syncInfo.pFsm = mndSyncMakeFsm(pMnode); syncInfo.pFsm = mndSyncMakeFsm(pMnode);
syncInfo.isStandBy = pMgmt->standby; syncInfo.isStandBy = pMgmt->standby;
syncInfo.snapshotEnable = true;
SSyncCfg *pCfg = &syncInfo.syncCfg; SSyncCfg *pCfg = &syncInfo.syncCfg;
pCfg->replicaNum = pMnode->replica; pCfg->replicaNum = pMnode->replica;
......
...@@ -26,6 +26,8 @@ static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot); ...@@ -26,6 +26,8 @@ static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot);
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
SSyncInfo syncInfo = { SSyncInfo syncInfo = {
.isStandBy = false,
.snapshotEnable = false,
.vgId = pVnode->config.vgId, .vgId = pVnode->config.vgId,
.syncCfg = pVnode->config.syncCfg, .syncCfg = pVnode->config.syncCfg,
.pWal = pVnode->pWal, .pWal = pVnode->pWal,
......
...@@ -39,6 +39,7 @@ extern "C" { ...@@ -39,6 +39,7 @@ extern "C" {
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>> // /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
// //
int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode); int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode);
int32_t syncNodeRequestVotePeersSnapshot(SSyncNode* pSyncNode);
int32_t syncNodeElect(SSyncNode* pSyncNode); int32_t syncNodeElect(SSyncNode* pSyncNode);
int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg); int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg);
......
...@@ -34,6 +34,7 @@ typedef struct SRaftCfg { ...@@ -34,6 +34,7 @@ typedef struct SRaftCfg {
TdFilePtr pFile; TdFilePtr pFile;
char path[TSDB_FILENAME_LEN * 2]; char path[TSDB_FILENAME_LEN * 2];
int8_t isStandBy; int8_t isStandBy;
int8_t snapshotEnable;
} SRaftCfg; } SRaftCfg;
SRaftCfg *raftCfgOpen(const char *path); SRaftCfg *raftCfgOpen(const char *path);
...@@ -50,7 +51,12 @@ char * raftCfg2Str(SRaftCfg *pRaftCfg); ...@@ -50,7 +51,12 @@ char * raftCfg2Str(SRaftCfg *pRaftCfg);
int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg); int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg);
int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg); int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg);
int32_t raftCfgCreateFile(SSyncCfg *pCfg, int8_t isStandBy, const char *path); typedef struct SRaftCfgMeta {
int8_t isStandBy;
int8_t snapshotEnable;
} SRaftCfgMeta;
int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path);
// for debug ---------------------- // for debug ----------------------
void syncCfgPrint(SSyncCfg *pCfg); void syncCfgPrint(SSyncCfg *pCfg);
......
...@@ -49,6 +49,8 @@ int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) { ...@@ -49,6 +49,8 @@ int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {
return ret; return ret;
} }
int32_t syncNodeRequestVotePeersSnapshot(SSyncNode* pSyncNode) { return 0; }
int32_t syncNodeElect(SSyncNode* pSyncNode) { int32_t syncNodeElect(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t ret = 0;
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
......
...@@ -411,8 +411,11 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { ...@@ -411,8 +411,11 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path); snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path);
if (!taosCheckExistFile(pSyncNode->configPath)) { if (!taosCheckExistFile(pSyncNode->configPath)) {
// create raft config file // create a new raft config file
ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), pSyncInfo->isStandBy, pSyncNode->configPath); SRaftCfgMeta meta;
meta.isStandBy = pSyncInfo->isStandBy;
meta.snapshotEnable = pSyncInfo->snapshotEnable;
ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath);
assert(ret == 0); assert(ret == 0);
} else { } else {
......
...@@ -148,6 +148,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) { ...@@ -148,6 +148,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
cJSON *pRoot = cJSON_CreateObject(); cJSON *pRoot = cJSON_CreateObject();
cJSON_AddItemToObject(pRoot, "SSyncCfg", syncCfg2Json(&(pRaftCfg->cfg))); cJSON_AddItemToObject(pRoot, "SSyncCfg", syncCfg2Json(&(pRaftCfg->cfg)));
cJSON_AddNumberToObject(pRoot, "isStandBy", pRaftCfg->isStandBy); cJSON_AddNumberToObject(pRoot, "isStandBy", pRaftCfg->isStandBy);
cJSON_AddNumberToObject(pRoot, "snapshotEnable", pRaftCfg->snapshotEnable);
cJSON *pJson = cJSON_CreateObject(); cJSON *pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "RaftCfg", pRoot); cJSON_AddItemToObject(pJson, "RaftCfg", pRoot);
...@@ -161,7 +162,7 @@ char *raftCfg2Str(SRaftCfg *pRaftCfg) { ...@@ -161,7 +162,7 @@ char *raftCfg2Str(SRaftCfg *pRaftCfg) {
return serialized; return serialized;
} }
int32_t raftCfgCreateFile(SSyncCfg *pCfg, int8_t isStandBy, const char *path) { int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
assert(pCfg != NULL); assert(pCfg != NULL);
TdFilePtr pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE); TdFilePtr pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE);
...@@ -169,7 +170,8 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, int8_t isStandBy, const char *path) { ...@@ -169,7 +170,8 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, int8_t isStandBy, const char *path) {
SRaftCfg raftCfg; SRaftCfg raftCfg;
raftCfg.cfg = *pCfg; raftCfg.cfg = *pCfg;
raftCfg.isStandBy = isStandBy; raftCfg.isStandBy = meta.isStandBy;
raftCfg.snapshotEnable = meta.snapshotEnable;
char *s = raftCfg2Str(&raftCfg); char *s = raftCfg2Str(&raftCfg);
char buf[CONFIG_FILE_LEN]; char buf[CONFIG_FILE_LEN];
...@@ -194,6 +196,9 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) { ...@@ -194,6 +196,9 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
cJSON *pJsonIsStandBy = cJSON_GetObjectItem(pJson, "isStandBy"); cJSON *pJsonIsStandBy = cJSON_GetObjectItem(pJson, "isStandBy");
pRaftCfg->isStandBy = cJSON_GetNumberValue(pJsonIsStandBy); pRaftCfg->isStandBy = cJSON_GetNumberValue(pJsonIsStandBy);
cJSON *pJsonSnapshotEnable = cJSON_GetObjectItem(pJson, "snapshotEnable");
pRaftCfg->snapshotEnable = cJSON_GetNumberValue(pJsonSnapshotEnable);
cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg)); int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
ASSERT(code == 0); ASSERT(code == 0);
......
...@@ -93,7 +93,6 @@ SSyncFSM* createFsm() { ...@@ -93,7 +93,6 @@ SSyncFSM* createFsm() {
pFsm->FpGetSnapshot = GetSnapshotCb; pFsm->FpGetSnapshot = GetSnapshotCb;
pFsm->FpRestoreFinishCb = RestoreFinishCb; pFsm->FpRestoreFinishCb = RestoreFinishCb;
pFsm->FpReConfigCb = ReConfigCb; pFsm->FpReConfigCb = ReConfigCb;
return pFsm; return pFsm;
......
...@@ -71,7 +71,10 @@ void test3() { ...@@ -71,7 +71,10 @@ void test3() {
if (taosCheckExistFile(s)) { if (taosCheckExistFile(s)) {
printf("%s file: %s already exist! \n", (char*)__FUNCTION__, s); printf("%s file: %s already exist! \n", (char*)__FUNCTION__, s);
} else { } else {
raftCfgCreateFile(pCfg, 7, s); SRaftCfgMeta meta;
meta.isStandBy = 7;
meta.snapshotEnable = 9;
raftCfgCreateFile(pCfg, meta, s);
printf("%s create json file: %s \n", (char*)__FUNCTION__, s); printf("%s create json file: %s \n", (char*)__FUNCTION__, s);
} }
...@@ -94,6 +97,7 @@ void test5() { ...@@ -94,6 +97,7 @@ void test5() {
pCfg->cfg.myIndex = taosGetTimestampSec(); pCfg->cfg.myIndex = taosGetTimestampSec();
pCfg->isStandBy += 2; pCfg->isStandBy += 2;
pCfg->snapshotEnable += 3;
raftCfgPersist(pCfg); raftCfgPersist(pCfg);
printf("%s update json file: %s myIndex->%d \n", (char*)__FUNCTION__, "./test3_raft_cfg.json", pCfg->cfg.myIndex); printf("%s update json file: %s myIndex->%d \n", (char*)__FUNCTION__, "./test3_raft_cfg.json", pCfg->cfg.myIndex);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册