未验证 提交 cd71d869 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #14584 from taosdata/feature/3.0_mhli

refactor(sync): snapshot strategy
...@@ -26,7 +26,7 @@ extern "C" { ...@@ -26,7 +26,7 @@ extern "C" {
extern bool gRaftDetailLog; extern bool gRaftDetailLog;
#define SYNC_MAX_BATCH_SIZE 100 #define SYNC_MAX_BATCH_SIZE 500
#define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1 #define SYNC_INDEX_INVALID -1
#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF #define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF
......
...@@ -36,7 +36,7 @@ typedef struct SRaftCfg { ...@@ -36,7 +36,7 @@ typedef struct SRaftCfg {
TdFilePtr pFile; TdFilePtr pFile;
char path[TSDB_FILENAME_LEN * 2]; char path[TSDB_FILENAME_LEN * 2];
int8_t isStandBy; int8_t isStandBy;
int8_t snapshotEnable; int8_t snapshotStrategy;
SyncIndex lastConfigIndex; SyncIndex lastConfigIndex;
SyncIndex configIndexArr[MAX_CONFIG_INDEX_COUNT]; SyncIndex configIndexArr[MAX_CONFIG_INDEX_COUNT];
...@@ -49,20 +49,20 @@ int32_t raftCfgClose(SRaftCfg *pRaftCfg); ...@@ -49,20 +49,20 @@ int32_t raftCfgClose(SRaftCfg *pRaftCfg);
int32_t raftCfgPersist(SRaftCfg *pRaftCfg); int32_t raftCfgPersist(SRaftCfg *pRaftCfg);
int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex); int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex);
cJSON * syncCfg2Json(SSyncCfg *pSyncCfg); cJSON *syncCfg2Json(SSyncCfg *pSyncCfg);
char * syncCfg2Str(SSyncCfg *pSyncCfg); char *syncCfg2Str(SSyncCfg *pSyncCfg);
char * syncCfg2SimpleStr(SSyncCfg *pSyncCfg); char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg);
int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg); int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg);
int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg); int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg);
cJSON * raftCfg2Json(SRaftCfg *pRaftCfg); cJSON *raftCfg2Json(SRaftCfg *pRaftCfg);
char * raftCfg2Str(SRaftCfg *pRaftCfg); char *raftCfg2Str(SRaftCfg *pRaftCfg);
int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg); int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg);
int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg); int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg);
typedef struct SRaftCfgMeta { typedef struct SRaftCfgMeta {
int8_t isStandBy; int8_t isStandBy;
int8_t snapshotEnable; int8_t snapshotStrategy;
SyncIndex lastConfigIndex; SyncIndex lastConfigIndex;
} SRaftCfgMeta; } SRaftCfgMeta;
......
...@@ -96,12 +96,20 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { ...@@ -96,12 +96,20 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
return ret; return ret;
} }
if (pSyncNode->pRaftCfg->snapshotEnable) { switch (pSyncNode->pRaftCfg->snapshotStrategy) {
ret = syncNodeRequestVotePeersSnapshot(pSyncNode); case SYNC_STRATEGY_NO_SNAPSHOT:
} else { ret = syncNodeRequestVotePeers(pSyncNode);
ret = syncNodeRequestVotePeers(pSyncNode); break;
case SYNC_STRATEGY_STANDARD_SNAPSHOT:
case SYNC_STRATEGY_WAL_FIRST:
ret = syncNodeRequestVotePeersSnapshot(pSyncNode);
break;
default:
ret = syncNodeRequestVotePeers(pSyncNode);
break;
} }
ASSERT(ret == 0); ASSERT(ret == 0);
syncNodeResetElectTimer(pSyncNode); syncNodeResetElectTimer(pSyncNode);
......
...@@ -672,12 +672,12 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWe ...@@ -672,12 +672,12 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWe
} }
if (arrSize > SYNC_MAX_BATCH_SIZE) { if (arrSize > SYNC_MAX_BATCH_SIZE) {
syncNodeErrorLog(pSyncNode, "sync propose match batch error"); syncNodeErrorLog(pSyncNode, "sync propose batch error");
terrno = TSDB_CODE_SYN_BATCH_ERROR; terrno = TSDB_CODE_SYN_BATCH_ERROR;
return -1; return -1;
} }
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
syncNodeErrorLog(pSyncNode, "sync propose not leader"); syncNodeErrorLog(pSyncNode, "sync propose not leader");
terrno = TSDB_CODE_SYN_NOT_LEADER; terrno = TSDB_CODE_SYN_NOT_LEADER;
return -1; return -1;
...@@ -711,7 +711,7 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWe ...@@ -711,7 +711,7 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWe
// enqueue msg ok // enqueue msg ok
} else { } else {
sError("enqueue msg error, FpEqMsg is NULL"); sError("vgId:%d, enqueue msg error, FpEqMsg is NULL", pSyncNode->vgId);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1; return -1;
} }
...@@ -730,7 +730,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { ...@@ -730,7 +730,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
if (pSyncNode->changing && pMsg->msgType != TDMT_SYNC_CONFIG_CHANGE_FINISH) { if (pSyncNode->changing && pMsg->msgType != TDMT_SYNC_CONFIG_CHANGE_FINISH) {
ret = -1; ret = -1;
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY; terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
sError("sync propose not ready, type:%s,%d", TMSG_INFO(pMsg->msgType), pMsg->msgType); sError("vgId:%d, sync propose not ready, type:%s,%d", pSyncNode->vgId, TMSG_INFO(pMsg->msgType), pMsg->msgType);
goto _END; goto _END;
} }
...@@ -739,7 +739,8 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { ...@@ -739,7 +739,8 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
if (!syncNodeCanChange(pSyncNode)) { if (!syncNodeCanChange(pSyncNode)) {
ret = -1; ret = -1;
terrno = TSDB_CODE_SYN_RECONFIG_NOT_READY; terrno = TSDB_CODE_SYN_RECONFIG_NOT_READY;
sError("sync reconfig not ready, type:%s,%d", TMSG_INFO(pMsg->msgType), pMsg->msgType); sError("vgId:%d, sync reconfig not ready, type:%s,%d", pSyncNode->vgId, TMSG_INFO(pMsg->msgType),
pMsg->msgType);
goto _END; goto _END;
} }
...@@ -780,7 +781,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { ...@@ -780,7 +781,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
} else { } else {
ret = -1; ret = -1;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
sError("enqueue msg error, FpEqMsg is NULL"); sError("vgId:%d, enqueue msg error, FpEqMsg is NULL", pSyncNode->vgId);
} }
} }
...@@ -790,7 +791,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { ...@@ -790,7 +791,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
} else { } else {
ret = -1; ret = -1;
terrno = TSDB_CODE_SYN_NOT_LEADER; terrno = TSDB_CODE_SYN_NOT_LEADER;
sError("sync propose not leader, %s", syncUtilState2String(pSyncNode->state)); sError("vgId:%d, sync propose not leader, %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state));
goto _END; goto _END;
} }
...@@ -820,7 +821,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { ...@@ -820,7 +821,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// create a new raft config file // create a new raft config file
SRaftCfgMeta meta; SRaftCfgMeta meta;
meta.isStandBy = pSyncInfo->isStandBy; meta.isStandBy = pSyncInfo->isStandBy;
meta.snapshotEnable = pSyncInfo->snapshotStrategy; meta.snapshotStrategy = pSyncInfo->snapshotStrategy;
meta.lastConfigIndex = SYNC_INDEX_INVALID; meta.lastConfigIndex = SYNC_INDEX_INVALID;
ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath); ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath);
ASSERT(ret == 0); ASSERT(ret == 0);
...@@ -969,7 +970,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { ...@@ -969,7 +970,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
pSyncNode->FpOnSnapshotSend = syncNodeOnSnapshotSendCb; pSyncNode->FpOnSnapshotSend = syncNodeOnSnapshotSendCb;
pSyncNode->FpOnSnapshotRsp = syncNodeOnSnapshotRspCb; pSyncNode->FpOnSnapshotRsp = syncNodeOnSnapshotRspCb;
if (pSyncNode->pRaftCfg->snapshotEnable) { if (pSyncNode->pRaftCfg->snapshotStrategy) {
sInfo("sync node use snapshot"); sInfo("sync node use snapshot");
pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteSnapshotCb; pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteSnapshotCb;
pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplySnapshotCb; pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplySnapshotCb;
...@@ -1107,7 +1108,7 @@ void syncNodeClose(SSyncNode* pSyncNode) { ...@@ -1107,7 +1108,7 @@ void syncNodeClose(SSyncNode* pSyncNode) {
// option // option
// bool syncNodeSnapshotEnable(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; } // bool syncNodeSnapshotEnable(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; }
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; } ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotStrategy; }
// ping -------------- // ping --------------
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) { int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
...@@ -2496,6 +2497,15 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p ...@@ -2496,6 +2497,15 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
walFsync(pWal, true); walFsync(pWal, true);
if (ths->replicaNum > 1) {
// if mulit replica, start replicate right now
syncNodeReplicate(ths);
} else if (ths->replicaNum == 1) {
// one replica
syncMaybeAdvanceCommitIndex(ths);
}
return 0; return 0;
} }
......
...@@ -101,7 +101,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) { ...@@ -101,7 +101,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
char *syncCfg2Str(SSyncCfg *pSyncCfg) { char *syncCfg2Str(SSyncCfg *pSyncCfg) {
cJSON *pJson = syncCfg2Json(pSyncCfg); cJSON *pJson = syncCfg2Json(pSyncCfg);
char * serialized = cJSON_Print(pJson); char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
...@@ -109,7 +109,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) { ...@@ -109,7 +109,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) { char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) {
if (pSyncCfg != NULL) { if (pSyncCfg != NULL) {
int32_t len = 512; int32_t len = 512;
char * s = taosMemoryMalloc(len); char *s = taosMemoryMalloc(len);
memset(s, 0, len); memset(s, 0, len);
snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex); snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex);
...@@ -182,7 +182,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) { ...@@ -182,7 +182,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
cJSON *pRoot = cJSON_CreateObject(); cJSON *pRoot = cJSON_CreateObject();
cJSON_AddItemToObject(pRoot, "SSyncCfg", syncCfg2Json(&(pRaftCfg->cfg))); cJSON_AddItemToObject(pRoot, "SSyncCfg", syncCfg2Json(&(pRaftCfg->cfg)));
cJSON_AddNumberToObject(pRoot, "isStandBy", pRaftCfg->isStandBy); cJSON_AddNumberToObject(pRoot, "isStandBy", pRaftCfg->isStandBy);
cJSON_AddNumberToObject(pRoot, "snapshotEnable", pRaftCfg->snapshotEnable); cJSON_AddNumberToObject(pRoot, "snapshotStrategy", pRaftCfg->snapshotStrategy);
char buf64[128]; char buf64[128];
snprintf(buf64, sizeof(buf64), "%ld", pRaftCfg->lastConfigIndex); snprintf(buf64, sizeof(buf64), "%ld", pRaftCfg->lastConfigIndex);
...@@ -205,7 +205,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) { ...@@ -205,7 +205,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
char *raftCfg2Str(SRaftCfg *pRaftCfg) { char *raftCfg2Str(SRaftCfg *pRaftCfg) {
cJSON *pJson = raftCfg2Json(pRaftCfg); cJSON *pJson = raftCfg2Json(pRaftCfg);
char * serialized = cJSON_Print(pJson); char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
...@@ -228,7 +228,7 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) { ...@@ -228,7 +228,7 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
SRaftCfg raftCfg; SRaftCfg raftCfg;
raftCfg.cfg = *pCfg; raftCfg.cfg = *pCfg;
raftCfg.isStandBy = meta.isStandBy; raftCfg.isStandBy = meta.isStandBy;
raftCfg.snapshotEnable = meta.snapshotEnable; raftCfg.snapshotStrategy = meta.snapshotStrategy;
raftCfg.lastConfigIndex = meta.lastConfigIndex; raftCfg.lastConfigIndex = meta.lastConfigIndex;
raftCfg.configIndexCount = 1; raftCfg.configIndexCount = 1;
memset(raftCfg.configIndexArr, 0, sizeof(raftCfg.configIndexArr)); memset(raftCfg.configIndexArr, 0, sizeof(raftCfg.configIndexArr));
...@@ -257,8 +257,8 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) { ...@@ -257,8 +257,8 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
cJSON *pJsonIsStandBy = cJSON_GetObjectItem(pJson, "isStandBy"); cJSON *pJsonIsStandBy = cJSON_GetObjectItem(pJson, "isStandBy");
pRaftCfg->isStandBy = cJSON_GetNumberValue(pJsonIsStandBy); pRaftCfg->isStandBy = cJSON_GetNumberValue(pJsonIsStandBy);
cJSON *pJsonSnapshotEnable = cJSON_GetObjectItem(pJson, "snapshotEnable"); cJSON *pJsonSnapshotStrategy = cJSON_GetObjectItem(pJson, "snapshotStrategy");
pRaftCfg->snapshotEnable = cJSON_GetNumberValue(pJsonSnapshotEnable); pRaftCfg->snapshotStrategy = cJSON_GetNumberValue(pJsonSnapshotStrategy);
cJSON *pJsonLastConfigIndex = cJSON_GetObjectItem(pJson, "lastConfigIndex"); cJSON *pJsonLastConfigIndex = cJSON_GetObjectItem(pJson, "lastConfigIndex");
pRaftCfg->lastConfigIndex = atoll(cJSON_GetStringValue(pJsonLastConfigIndex)); pRaftCfg->lastConfigIndex = atoll(cJSON_GetStringValue(pJsonLastConfigIndex));
...@@ -280,7 +280,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) { ...@@ -280,7 +280,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
(pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring); (pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring);
} }
cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg)); int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
ASSERT(code == 0); ASSERT(code == 0);
......
...@@ -132,10 +132,6 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { ...@@ -132,10 +132,6 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex); SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex);
SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex); SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex);
if (preLogTerm == SYNC_TERM_INVALID) { if (preLogTerm == SYNC_TERM_INVALID) {
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
ASSERT(pSender != NULL);
ASSERT(!snapshotSenderIsStart(pSender));
SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1; SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex); syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex);
syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID); syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID);
...@@ -145,26 +141,32 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) { ...@@ -145,26 +141,32 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
return -1; return -1;
} }
// entry pointer array
SSyncRaftEntry* entryPArr[SYNC_MAX_BATCH_SIZE]; SSyncRaftEntry* entryPArr[SYNC_MAX_BATCH_SIZE];
memset(entryPArr, 0, sizeof(entryPArr)); memset(entryPArr, 0, sizeof(entryPArr));
// get entry batch
int32_t getCount = 0; int32_t getCount = 0;
SyncIndex getEntryIndex = nextIndex; SyncIndex getEntryIndex = nextIndex;
for (int32_t i = 0; i < pSyncNode->batchSize; ++i) { for (int32_t i = 0; i < pSyncNode->batchSize; ++i) {
SSyncRaftEntry* pEntry; SSyncRaftEntry* pEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, getEntryIndex, &pEntry); int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, getEntryIndex, &pEntry);
if (code == 0) { if (code == 0) {
ASSERT(pEntry != NULL); ASSERT(pEntry != NULL);
entryPArr[i] = pEntry; entryPArr[i] = pEntry;
getCount++; getCount++;
getEntryIndex++;
} else { } else {
break; break;
} }
} }
// build msg
SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchBuild(entryPArr, getCount, pSyncNode->vgId); SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchBuild(entryPArr, getCount, pSyncNode->vgId);
ASSERT(pMsg != NULL); ASSERT(pMsg != NULL);
// free entries
for (int32_t i = 0; i < pSyncNode->batchSize; ++i) { for (int32_t i = 0; i < pSyncNode->batchSize; ++i) {
SSyncRaftEntry* pEntry = entryPArr[i]; SSyncRaftEntry* pEntry = entryPArr[i];
if (pEntry != NULL) { if (pEntry != NULL) {
...@@ -197,12 +199,6 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { ...@@ -197,12 +199,6 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
syncIndexMgrLog2("begin append entries peers pNextIndex:", pSyncNode->pNextIndex); syncIndexMgrLog2("begin append entries peers pNextIndex:", pSyncNode->pNextIndex);
syncIndexMgrLog2("begin append entries peers pMatchIndex:", pSyncNode->pMatchIndex); syncIndexMgrLog2("begin append entries peers pMatchIndex:", pSyncNode->pMatchIndex);
logStoreSimpleLog2("begin append entries peers LogStore:", pSyncNode->pLogStore); logStoreSimpleLog2("begin append entries peers LogStore:", pSyncNode->pLogStore);
if (gRaftDetailLog) {
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
sTrace("begin append entries peers, snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
snapshot.lastApplyIndex, snapshot.lastApplyTerm);
}
int32_t ret = 0; int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) { for (int i = 0; i < pSyncNode->peersNum; ++i) {
...@@ -224,9 +220,6 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { ...@@ -224,9 +220,6 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
return -1; return -1;
} }
// batch optimized
// SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex);
// prepare entry // prepare entry
SyncAppendEntries* pMsg = NULL; SyncAppendEntries* pMsg = NULL;
...@@ -283,11 +276,24 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) { ...@@ -283,11 +276,24 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
// start replicate // start replicate
int32_t ret = 0; int32_t ret = 0;
if (pSyncNode->pRaftCfg->snapshotEnable) { switch (pSyncNode->pRaftCfg->snapshotStrategy) {
ret = syncNodeAppendEntriesPeersSnapshot(pSyncNode); case SYNC_STRATEGY_NO_SNAPSHOT:
} else { ret = syncNodeAppendEntriesPeers(pSyncNode);
ret = syncNodeAppendEntriesPeers(pSyncNode); break;
case SYNC_STRATEGY_STANDARD_SNAPSHOT:
ret = syncNodeAppendEntriesPeersSnapshot(pSyncNode);
break;
case SYNC_STRATEGY_WAL_FIRST:
ret = syncNodeAppendEntriesPeersSnapshot2(pSyncNode);
break;
default:
ret = syncNodeAppendEntriesPeers(pSyncNode);
break;
} }
return ret; return ret;
} }
......
...@@ -83,7 +83,7 @@ void test3() { ...@@ -83,7 +83,7 @@ void test3() {
} else { } else {
SRaftCfgMeta meta; SRaftCfgMeta meta;
meta.isStandBy = 7; meta.isStandBy = 7;
meta.snapshotEnable = 9; meta.snapshotStrategy = 9;
meta.lastConfigIndex = 789; meta.lastConfigIndex = 789;
raftCfgCreateFile(pCfg, meta, s); raftCfgCreateFile(pCfg, meta, s);
printf("%s create json file: %s \n", (char*)__FUNCTION__, s); printf("%s create json file: %s \n", (char*)__FUNCTION__, s);
...@@ -108,7 +108,7 @@ void test5() { ...@@ -108,7 +108,7 @@ void test5() {
pCfg->cfg.myIndex = taosGetTimestampSec(); pCfg->cfg.myIndex = taosGetTimestampSec();
pCfg->isStandBy += 2; pCfg->isStandBy += 2;
pCfg->snapshotEnable += 3; pCfg->snapshotStrategy += 3;
pCfg->lastConfigIndex += 1000; pCfg->lastConfigIndex += 1000;
pCfg->configIndexCount = 5; pCfg->configIndexCount = 5;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册