提交 83b639ae 编写于 作者: M Minghao Li

refactor(sync): persist batch size

上级 6ec47a7a
...@@ -36,6 +36,7 @@ typedef struct SRaftCfg { ...@@ -36,6 +36,7 @@ typedef struct SRaftCfg {
TdFilePtr pFile; TdFilePtr pFile;
char path[TSDB_FILENAME_LEN * 2]; char path[TSDB_FILENAME_LEN * 2];
int8_t isStandBy; int8_t isStandBy;
int32_t batchSize;
int8_t snapshotStrategy; int8_t snapshotStrategy;
SyncIndex lastConfigIndex; SyncIndex lastConfigIndex;
...@@ -62,6 +63,7 @@ int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg); ...@@ -62,6 +63,7 @@ int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg);
typedef struct SRaftCfgMeta { typedef struct SRaftCfgMeta {
int8_t isStandBy; int8_t isStandBy;
int32_t batchSize;
int8_t snapshotStrategy; int8_t snapshotStrategy;
SyncIndex lastConfigIndex; SyncIndex lastConfigIndex;
} SRaftCfgMeta; } SRaftCfgMeta;
......
...@@ -834,7 +834,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -834,7 +834,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
// //
// operation: // operation:
// if hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex, append entry // if hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex, append entry
// match my-commit-index or my-commit-index + 1 // match my-commit-index or my-commit-index + batchSize
do { do {
bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
(pMsg->prevLogIndex <= ths->commitIndex); (pMsg->prevLogIndex <= ths->commitIndex);
...@@ -928,11 +928,13 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc ...@@ -928,11 +928,13 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
bool condition = condition1 || condition2; bool condition = condition1 || condition2;
if (condition) { if (condition) {
char logBuf[128]; do {
snprintf(logBuf, sizeof(logBuf), char logBuf[128];
"recv sync-append-entries-batch, not match, pre-index:%ld, pre-term:%lu, datalen:%d", pMsg->prevLogIndex, snprintf(logBuf, sizeof(logBuf),
pMsg->prevLogTerm, pMsg->dataLen); "recv sync-append-entries-batch, not match, pre-index:%ld, pre-term:%lu, datalen:%d",
syncNodeEventLog(ths, logBuf); pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
} while (0);
// prepare response msg // prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
......
...@@ -109,19 +109,30 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p ...@@ -109,19 +109,30 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
} }
// only start once // only start once
static void syncNodeStartSnapshot(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, SyncTerm lastApplyTerm, static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, SyncTerm lastApplyTerm,
SyncAppendEntriesReply* pMsg) { SyncAppendEntriesReply* pMsg) {
// get sender // get sender
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
ASSERT(pSender != NULL); ASSERT(pSender != NULL);
if (snapshotSenderIsStart(pSender)) {
do {
char* eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender already start");
syncNodeErrorLog(ths, eventLog);
taosMemoryFree(eventLog);
} while (0);
return;
}
SSnapshot snapshot = { SSnapshot snapshot = {
.data = NULL, .lastApplyIndex = endIndex, .lastApplyTerm = lastApplyTerm, .lastConfigIndex = SYNC_INDEX_INVALID}; .data = NULL, .lastApplyIndex = endIndex, .lastApplyTerm = lastApplyTerm, .lastConfigIndex = SYNC_INDEX_INVALID};
void* pReader = NULL; void* pReader = NULL;
SSnapshotParam readerParam = {.start = beginIndex, .end = endIndex}; SSnapshotParam readerParam = {.start = beginIndex, .end = endIndex};
ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader); int32_t code = ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader);
if (!snapshotSenderIsStart(pSender) && pMsg->privateTerm < pSender->privateTerm) { ASSERT(code == 0);
if (pMsg->privateTerm < pSender->privateTerm) {
ASSERT(pReader != NULL); ASSERT(pReader != NULL);
snapshotSenderStart(pSender, readerParam, snapshot, pReader); snapshotSenderStart(pSender, readerParam, snapshot, pReader);
...@@ -178,7 +189,9 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie ...@@ -178,7 +189,9 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
// start snapshot <match+1, old snapshot.end> // start snapshot <match+1, old snapshot.end>
SSnapshot oldSnapshot; SSnapshot oldSnapshot;
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &oldSnapshot); ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &oldSnapshot);
syncNodeStartSnapshot(ths, newMatchIndex + 1, oldSnapshot.lastApplyIndex, oldSnapshot.lastApplyTerm, pMsg); ASSERT(oldSnapshot.lastApplyIndex >= newMatchIndex + 1);
syncNodeStartSnapshotOnce(ths, newMatchIndex + 1, oldSnapshot.lastApplyIndex, oldSnapshot.lastApplyTerm,
pMsg); // term maybe not ok?
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), oldSnapshot.lastApplyIndex + 1); syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), oldSnapshot.lastApplyIndex + 1);
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex); syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex);
...@@ -187,7 +200,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie ...@@ -187,7 +200,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
} else { } else {
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
// notice! int64, uint64
if (nextIndex > SYNC_INDEX_BEGIN) { if (nextIndex > SYNC_INDEX_BEGIN) {
--nextIndex; --nextIndex;
...@@ -198,7 +210,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie ...@@ -198,7 +210,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
SSyncRaftEntry* pEntry; SSyncRaftEntry* pEntry;
int32_t code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, nextIndex, &pEntry); int32_t code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, nextIndex, &pEntry);
ASSERT(code == 0); ASSERT(code == 0);
syncNodeStartSnapshot(ths, SYNC_INDEX_BEGIN, nextIndex, pEntry->term, pMsg); syncNodeStartSnapshotOnce(ths, SYNC_INDEX_BEGIN, nextIndex, pEntry->term, pMsg);
// get sender // get sender
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
......
...@@ -846,6 +846,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { ...@@ -846,6 +846,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
meta.isStandBy = pSyncInfo->isStandBy; meta.isStandBy = pSyncInfo->isStandBy;
meta.snapshotStrategy = pSyncInfo->snapshotStrategy; meta.snapshotStrategy = pSyncInfo->snapshotStrategy;
meta.lastConfigIndex = SYNC_INDEX_INVALID; meta.lastConfigIndex = SYNC_INDEX_INVALID;
meta.batchSize = pSyncInfo->batchSize;
ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath); ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath);
ASSERT(ret == 0); ASSERT(ret == 0);
......
...@@ -183,6 +183,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) { ...@@ -183,6 +183,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
cJSON_AddItemToObject(pRoot, "SSyncCfg", syncCfg2Json(&(pRaftCfg->cfg))); cJSON_AddItemToObject(pRoot, "SSyncCfg", syncCfg2Json(&(pRaftCfg->cfg)));
cJSON_AddNumberToObject(pRoot, "isStandBy", pRaftCfg->isStandBy); cJSON_AddNumberToObject(pRoot, "isStandBy", pRaftCfg->isStandBy);
cJSON_AddNumberToObject(pRoot, "snapshotStrategy", pRaftCfg->snapshotStrategy); cJSON_AddNumberToObject(pRoot, "snapshotStrategy", pRaftCfg->snapshotStrategy);
cJSON_AddNumberToObject(pRoot, "batchSize", pRaftCfg->batchSize);
char buf64[128]; char buf64[128];
snprintf(buf64, sizeof(buf64), "%ld", pRaftCfg->lastConfigIndex); snprintf(buf64, sizeof(buf64), "%ld", pRaftCfg->lastConfigIndex);
...@@ -228,6 +229,7 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) { ...@@ -228,6 +229,7 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
SRaftCfg raftCfg; SRaftCfg raftCfg;
raftCfg.cfg = *pCfg; raftCfg.cfg = *pCfg;
raftCfg.isStandBy = meta.isStandBy; raftCfg.isStandBy = meta.isStandBy;
raftCfg.batchSize = meta.batchSize;
raftCfg.snapshotStrategy = meta.snapshotStrategy; raftCfg.snapshotStrategy = meta.snapshotStrategy;
raftCfg.lastConfigIndex = meta.lastConfigIndex; raftCfg.lastConfigIndex = meta.lastConfigIndex;
raftCfg.configIndexCount = 1; raftCfg.configIndexCount = 1;
...@@ -257,6 +259,9 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) { ...@@ -257,6 +259,9 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
cJSON *pJsonIsStandBy = cJSON_GetObjectItem(pJson, "isStandBy"); cJSON *pJsonIsStandBy = cJSON_GetObjectItem(pJson, "isStandBy");
pRaftCfg->isStandBy = cJSON_GetNumberValue(pJsonIsStandBy); pRaftCfg->isStandBy = cJSON_GetNumberValue(pJsonIsStandBy);
cJSON *pJsonBatchSize = cJSON_GetObjectItem(pJson, "batchSize");
pRaftCfg->batchSize = cJSON_GetNumberValue(pJsonBatchSize);
cJSON *pJsonSnapshotStrategy = cJSON_GetObjectItem(pJson, "snapshotStrategy"); cJSON *pJsonSnapshotStrategy = cJSON_GetObjectItem(pJson, "snapshotStrategy");
pRaftCfg->snapshotStrategy = cJSON_GetNumberValue(pJsonSnapshotStrategy); pRaftCfg->snapshotStrategy = cJSON_GetNumberValue(pJsonSnapshotStrategy);
......
...@@ -26,6 +26,7 @@ SRaftCfg* createRaftCfg() { ...@@ -26,6 +26,7 @@ SRaftCfg* createRaftCfg() {
snprintf(((pCfg->cfg.nodeInfo)[i]).nodeFqdn, sizeof(((pCfg->cfg.nodeInfo)[i]).nodeFqdn), "100.200.300.%d", i); snprintf(((pCfg->cfg.nodeInfo)[i]).nodeFqdn, sizeof(((pCfg->cfg.nodeInfo)[i]).nodeFqdn), "100.200.300.%d", i);
} }
pCfg->isStandBy = taosGetTimestampSec() % 100; pCfg->isStandBy = taosGetTimestampSec() % 100;
pCfg->batchSize = taosGetTimestampSec() % 100;
pCfg->configIndexCount = 5; pCfg->configIndexCount = 5;
for (int i = 0; i < MAX_CONFIG_INDEX_COUNT; ++i) { for (int i = 0; i < MAX_CONFIG_INDEX_COUNT; ++i) {
...@@ -84,6 +85,7 @@ void test3() { ...@@ -84,6 +85,7 @@ void test3() {
SRaftCfgMeta meta; SRaftCfgMeta meta;
meta.isStandBy = 7; meta.isStandBy = 7;
meta.snapshotStrategy = 9; meta.snapshotStrategy = 9;
meta.batchSize = 10;
meta.lastConfigIndex = 789; meta.lastConfigIndex = 789;
raftCfgCreateFile(pCfg, meta, s); raftCfgCreateFile(pCfg, meta, s);
printf("%s create json file: %s \n", (char*)__FUNCTION__, s); printf("%s create json file: %s \n", (char*)__FUNCTION__, s);
...@@ -109,6 +111,7 @@ void test5() { ...@@ -109,6 +111,7 @@ void test5() {
pCfg->cfg.myIndex = taosGetTimestampSec(); pCfg->cfg.myIndex = taosGetTimestampSec();
pCfg->isStandBy += 2; pCfg->isStandBy += 2;
pCfg->snapshotStrategy += 3; pCfg->snapshotStrategy += 3;
pCfg->batchSize += 4;
pCfg->lastConfigIndex += 1000; pCfg->lastConfigIndex += 1000;
pCfg->configIndexCount = 5; pCfg->configIndexCount = 5;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册