提交 cd4bc6e3 编写于 作者: M Minghao Li

refactor(sync): snapshot strategy

上级 8a4e251b
......@@ -36,7 +36,7 @@ typedef struct SRaftCfg {
TdFilePtr pFile;
char path[TSDB_FILENAME_LEN * 2];
int8_t isStandBy;
int8_t snapshotEnable;
int8_t snapshotStrategy;
SyncIndex lastConfigIndex;
SyncIndex configIndexArr[MAX_CONFIG_INDEX_COUNT];
......@@ -49,20 +49,20 @@ int32_t raftCfgClose(SRaftCfg *pRaftCfg);
int32_t raftCfgPersist(SRaftCfg *pRaftCfg);
int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex);
cJSON * syncCfg2Json(SSyncCfg *pSyncCfg);
char * syncCfg2Str(SSyncCfg *pSyncCfg);
char * syncCfg2SimpleStr(SSyncCfg *pSyncCfg);
cJSON *syncCfg2Json(SSyncCfg *pSyncCfg);
char *syncCfg2Str(SSyncCfg *pSyncCfg);
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg);
int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg);
int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg);
cJSON * raftCfg2Json(SRaftCfg *pRaftCfg);
char * raftCfg2Str(SRaftCfg *pRaftCfg);
cJSON *raftCfg2Json(SRaftCfg *pRaftCfg);
char *raftCfg2Str(SRaftCfg *pRaftCfg);
int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg);
int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg);
typedef struct SRaftCfgMeta {
int8_t isStandBy;
int8_t snapshotEnable;
int8_t snapshotStrategy;
SyncIndex lastConfigIndex;
} SRaftCfgMeta;
......
......@@ -96,12 +96,20 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
return ret;
}
if (pSyncNode->pRaftCfg->snapshotEnable) {
ret = syncNodeRequestVotePeersSnapshot(pSyncNode);
} else {
ret = syncNodeRequestVotePeers(pSyncNode);
switch (pSyncNode->pRaftCfg->snapshotStrategy) {
case SYNC_STRATEGY_NO_SNAPSHOT:
ret = syncNodeRequestVotePeers(pSyncNode);
break;
case SYNC_STRATEGY_STANDARD_SNAPSHOT:
case SYNC_STRATEGY_WAL_FIRST:
ret = syncNodeRequestVotePeersSnapshot(pSyncNode);
break;
default:
ret = syncNodeRequestVotePeers(pSyncNode);
break;
}
ASSERT(ret == 0);
syncNodeResetElectTimer(pSyncNode);
......
......@@ -742,7 +742,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
if (pSyncNode->changing && pMsg->msgType != TDMT_SYNC_CONFIG_CHANGE_FINISH) {
ret = -1;
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
sError("sync propose not ready, type:%s,%d", TMSG_INFO(pMsg->msgType), pMsg->msgType);
sError("vgId:%d, sync propose not ready, type:%s,%d", pSyncNode->vgId, TMSG_INFO(pMsg->msgType), pMsg->msgType);
goto _END;
}
......@@ -751,7 +751,8 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
if (!syncNodeCanChange(pSyncNode)) {
ret = -1;
terrno = TSDB_CODE_SYN_RECONFIG_NOT_READY;
sError("sync reconfig not ready, type:%s,%d", TMSG_INFO(pMsg->msgType), pMsg->msgType);
sError("vgId:%d, sync reconfig not ready, type:%s,%d", pSyncNode->vgId, TMSG_INFO(pMsg->msgType),
pMsg->msgType);
goto _END;
}
......@@ -792,7 +793,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
} else {
ret = -1;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
sError("enqueue msg error, FpEqMsg is NULL");
sError("vgId:%d, enqueue msg error, FpEqMsg is NULL", pSyncNode->vgId);
}
}
......@@ -802,7 +803,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
} else {
ret = -1;
terrno = TSDB_CODE_SYN_NOT_LEADER;
sError("sync propose not leader, %s", syncUtilState2String(pSyncNode->state));
sError("vgId:%d, sync propose not leader, %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state));
goto _END;
}
......@@ -832,7 +833,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// create a new raft config file
SRaftCfgMeta meta;
meta.isStandBy = pSyncInfo->isStandBy;
meta.snapshotEnable = pSyncInfo->snapshotStrategy;
meta.snapshotStrategy = pSyncInfo->snapshotStrategy;
meta.lastConfigIndex = SYNC_INDEX_INVALID;
ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath);
ASSERT(ret == 0);
......@@ -981,7 +982,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
pSyncNode->FpOnSnapshotSend = syncNodeOnSnapshotSendCb;
pSyncNode->FpOnSnapshotRsp = syncNodeOnSnapshotRspCb;
if (pSyncNode->pRaftCfg->snapshotEnable) {
if (pSyncNode->pRaftCfg->snapshotStrategy) {
sInfo("sync node use snapshot");
pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteSnapshotCb;
pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplySnapshotCb;
......@@ -1119,7 +1120,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
// option
// bool syncNodeSnapshotEnable(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; }
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; }
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotStrategy; }
// ping --------------
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
......@@ -2508,6 +2509,15 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p
SWal* pWal = pData->pWal;
walFsync(pWal, true);
if (ths->replicaNum > 1) {
// if mulit replica, start replicate right now
syncNodeReplicate(ths);
} else if (ths->replicaNum == 1) {
// one replica
syncMaybeAdvanceCommitIndex(ths);
}
return 0;
}
......
......@@ -101,7 +101,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
char *syncCfg2Str(SSyncCfg *pSyncCfg) {
cJSON *pJson = syncCfg2Json(pSyncCfg);
char * serialized = cJSON_Print(pJson);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......@@ -109,7 +109,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) {
if (pSyncCfg != NULL) {
int32_t len = 512;
char * s = taosMemoryMalloc(len);
char *s = taosMemoryMalloc(len);
memset(s, 0, len);
snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex);
......@@ -182,7 +182,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
cJSON *pRoot = cJSON_CreateObject();
cJSON_AddItemToObject(pRoot, "SSyncCfg", syncCfg2Json(&(pRaftCfg->cfg)));
cJSON_AddNumberToObject(pRoot, "isStandBy", pRaftCfg->isStandBy);
cJSON_AddNumberToObject(pRoot, "snapshotEnable", pRaftCfg->snapshotEnable);
cJSON_AddNumberToObject(pRoot, "snapshotStrategy", pRaftCfg->snapshotStrategy);
char buf64[128];
snprintf(buf64, sizeof(buf64), "%ld", pRaftCfg->lastConfigIndex);
......@@ -205,7 +205,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
char *raftCfg2Str(SRaftCfg *pRaftCfg) {
cJSON *pJson = raftCfg2Json(pRaftCfg);
char * serialized = cJSON_Print(pJson);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......@@ -228,7 +228,7 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
SRaftCfg raftCfg;
raftCfg.cfg = *pCfg;
raftCfg.isStandBy = meta.isStandBy;
raftCfg.snapshotEnable = meta.snapshotEnable;
raftCfg.snapshotStrategy = meta.snapshotStrategy;
raftCfg.lastConfigIndex = meta.lastConfigIndex;
raftCfg.configIndexCount = 1;
memset(raftCfg.configIndexArr, 0, sizeof(raftCfg.configIndexArr));
......@@ -257,8 +257,8 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
cJSON *pJsonIsStandBy = cJSON_GetObjectItem(pJson, "isStandBy");
pRaftCfg->isStandBy = cJSON_GetNumberValue(pJsonIsStandBy);
cJSON *pJsonSnapshotEnable = cJSON_GetObjectItem(pJson, "snapshotEnable");
pRaftCfg->snapshotEnable = cJSON_GetNumberValue(pJsonSnapshotEnable);
cJSON *pJsonSnapshotStrategy = cJSON_GetObjectItem(pJson, "snapshotStrategy");
pRaftCfg->snapshotStrategy = cJSON_GetNumberValue(pJsonSnapshotStrategy);
cJSON *pJsonLastConfigIndex = cJSON_GetObjectItem(pJson, "lastConfigIndex");
pRaftCfg->lastConfigIndex = atoll(cJSON_GetStringValue(pJsonLastConfigIndex));
......@@ -280,7 +280,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
(pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring);
}
cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
ASSERT(code == 0);
......
......@@ -132,10 +132,6 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex);
SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex);
if (preLogTerm == SYNC_TERM_INVALID) {
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
ASSERT(pSender != NULL);
ASSERT(!snapshotSenderIsStart(pSender));
SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex);
syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID);
......@@ -145,26 +141,32 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
return -1;
}
// entry pointer array
SSyncRaftEntry* entryPArr[SYNC_MAX_BATCH_SIZE];
memset(entryPArr, 0, sizeof(entryPArr));
// get entry batch
int32_t getCount = 0;
SyncIndex getEntryIndex = nextIndex;
for (int32_t i = 0; i < pSyncNode->batchSize; ++i) {
SSyncRaftEntry* pEntry;
SSyncRaftEntry* pEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, getEntryIndex, &pEntry);
if (code == 0) {
ASSERT(pEntry != NULL);
entryPArr[i] = pEntry;
getCount++;
getEntryIndex++;
} else {
break;
}
}
// build msg
SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchBuild(entryPArr, getCount, pSyncNode->vgId);
ASSERT(pMsg != NULL);
// free entries
for (int32_t i = 0; i < pSyncNode->batchSize; ++i) {
SSyncRaftEntry* pEntry = entryPArr[i];
if (pEntry != NULL) {
......@@ -197,12 +199,6 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
syncIndexMgrLog2("begin append entries peers pNextIndex:", pSyncNode->pNextIndex);
syncIndexMgrLog2("begin append entries peers pMatchIndex:", pSyncNode->pMatchIndex);
logStoreSimpleLog2("begin append entries peers LogStore:", pSyncNode->pLogStore);
if (gRaftDetailLog) {
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
sTrace("begin append entries peers, snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
snapshot.lastApplyIndex, snapshot.lastApplyTerm);
}
int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) {
......@@ -224,9 +220,6 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
return -1;
}
// batch optimized
// SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex);
// prepare entry
SyncAppendEntries* pMsg = NULL;
......@@ -283,11 +276,24 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
// start replicate
int32_t ret = 0;
if (pSyncNode->pRaftCfg->snapshotEnable) {
ret = syncNodeAppendEntriesPeersSnapshot(pSyncNode);
} else {
ret = syncNodeAppendEntriesPeers(pSyncNode);
switch (pSyncNode->pRaftCfg->snapshotStrategy) {
case SYNC_STRATEGY_NO_SNAPSHOT:
ret = syncNodeAppendEntriesPeers(pSyncNode);
break;
case SYNC_STRATEGY_STANDARD_SNAPSHOT:
ret = syncNodeAppendEntriesPeersSnapshot(pSyncNode);
break;
case SYNC_STRATEGY_WAL_FIRST:
ret = syncNodeAppendEntriesPeersSnapshot2(pSyncNode);
break;
default:
ret = syncNodeAppendEntriesPeers(pSyncNode);
break;
}
return ret;
}
......
......@@ -83,7 +83,7 @@ void test3() {
} else {
SRaftCfgMeta meta;
meta.isStandBy = 7;
meta.snapshotEnable = 9;
meta.snapshotStrategy = 9;
meta.lastConfigIndex = 789;
raftCfgCreateFile(pCfg, meta, s);
printf("%s create json file: %s \n", (char*)__FUNCTION__, s);
......@@ -108,7 +108,7 @@ void test5() {
pCfg->cfg.myIndex = taosGetTimestampSec();
pCfg->isStandBy += 2;
pCfg->snapshotEnable += 3;
pCfg->snapshotStrategy += 3;
pCfg->lastConfigIndex += 1000;
pCfg->configIndexCount = 5;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册