提交 17f8ccc9 编写于 作者: S Shengliang Guan

enh: record config index

上级 65f65b53
...@@ -50,6 +50,10 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM ...@@ -50,6 +50,10 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
tsem_post(&pMgmt->syncSem); tsem_post(&pMgmt->syncSem);
} else { } else {
if (cbMeta.index - sdbGetApplyIndex(pMnode->pSdb) > 100) { if (cbMeta.index - sdbGetApplyIndex(pMnode->pSdb) > 100) {
SSnapshotMeta sMeta = {0};
if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex);
}
sdbWriteFile(pMnode->pSdb); sdbWriteFile(pMnode->pSdb);
} }
} }
...@@ -59,11 +63,18 @@ int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { ...@@ -59,11 +63,18 @@ int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
pSnapshot->lastApplyIndex = sdbGetCommitIndex(pMnode->pSdb); pSnapshot->lastApplyIndex = sdbGetCommitIndex(pMnode->pSdb);
pSnapshot->lastApplyTerm = sdbGetCommitTerm(pMnode->pSdb); pSnapshot->lastApplyTerm = sdbGetCommitTerm(pMnode->pSdb);
pSnapshot->lastConfigIndex = sdbGetCurConfig(pMnode->pSdb);
return 0; return 0;
} }
void mndRestoreFinish(struct SSyncFSM *pFsm) { void mndRestoreFinish(struct SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
SSnapshotMeta sMeta = {0};
if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex);
}
if (!pMnode->deploy) { if (!pMnode->deploy) {
mInfo("mnode sync restore finished, and will handle outstanding transactions"); mInfo("mnode sync restore finished, and will handle outstanding transactions");
mndTransPullup(pMnode); mndTransPullup(pMnode);
......
...@@ -1388,6 +1388,10 @@ void mndTransPullup(SMnode *pMnode) { ...@@ -1388,6 +1388,10 @@ void mndTransPullup(SMnode *pMnode) {
mndReleaseTrans(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans);
} }
SSnapshotMeta sMeta = {0};
if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex);
}
sdbWriteFile(pMnode->pSdb); sdbWriteFile(pMnode->pSdb);
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
} }
......
...@@ -171,6 +171,7 @@ typedef struct SSdb { ...@@ -171,6 +171,7 @@ typedef struct SSdb {
int64_t lastCommitTerm; int64_t lastCommitTerm;
int64_t curVer; int64_t curVer;
int64_t curTerm; int64_t curTerm;
int64_t curConfig;
int64_t tableVer[SDB_MAX]; int64_t tableVer[SDB_MAX];
int64_t maxId[SDB_MAX]; int64_t maxId[SDB_MAX];
EKeyType keyTypes[SDB_MAX]; EKeyType keyTypes[SDB_MAX];
...@@ -359,10 +360,12 @@ int64_t sdbGetTableVer(SSdb *pSdb, ESdbType type); ...@@ -359,10 +360,12 @@ int64_t sdbGetTableVer(SSdb *pSdb, ESdbType type);
*/ */
void sdbSetApplyIndex(SSdb *pSdb, int64_t index); void sdbSetApplyIndex(SSdb *pSdb, int64_t index);
void sdbSetApplyTerm(SSdb *pSdb, int64_t term); void sdbSetApplyTerm(SSdb *pSdb, int64_t term);
void sdbSetCurConfig(SSdb *pSdb, int64_t config);
int64_t sdbGetApplyIndex(SSdb *pSdb); int64_t sdbGetApplyIndex(SSdb *pSdb);
int64_t sdbGetApplyTerm(SSdb *pSdb); int64_t sdbGetApplyTerm(SSdb *pSdb);
int64_t sdbGetCommitIndex(SSdb *pSdb); int64_t sdbGetCommitIndex(SSdb *pSdb);
int64_t sdbGetCommitTerm(SSdb *pSdb); int64_t sdbGetCommitTerm(SSdb *pSdb);
int64_t sdbGetCurConfig(SSdb *pSdb);
SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen); SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen);
void sdbFreeRaw(SSdbRaw *pRaw); void sdbFreeRaw(SSdbRaw *pRaw);
......
...@@ -161,10 +161,14 @@ void sdbSetApplyIndex(SSdb *pSdb, int64_t index) { pSdb->curVer = index; } ...@@ -161,10 +161,14 @@ void sdbSetApplyIndex(SSdb *pSdb, int64_t index) { pSdb->curVer = index; }
void sdbSetApplyTerm(SSdb *pSdb, int64_t term) { pSdb->curTerm = term; } void sdbSetApplyTerm(SSdb *pSdb, int64_t term) { pSdb->curTerm = term; }
void sdbSetCurConfig(SSdb *pSdb, int64_t config) { pSdb->curConfig = config; }
int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->curVer; } int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->curVer; }
int64_t sdbGetApplyTerm(SSdb *pSdb) { return pSdb->curTerm; } int64_t sdbGetApplyTerm(SSdb *pSdb) { return pSdb->curTerm; }
int64_t sdbGetCommitIndex(SSdb *pSdb) { return pSdb->lastCommitVer; } int64_t sdbGetCommitIndex(SSdb *pSdb) { return pSdb->lastCommitVer; }
int64_t sdbGetCommitTerm(SSdb *pSdb) { return pSdb->lastCommitTerm; } int64_t sdbGetCommitTerm(SSdb *pSdb) { return pSdb->lastCommitTerm; }
\ No newline at end of file
int64_t sdbGetCurConfig(SSdb *pSdb) { return pSdb->curConfig; }
\ No newline at end of file
...@@ -110,6 +110,16 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) { ...@@ -110,6 +110,16 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
return -1; return -1;
} }
ret = taosReadFile(pFile, &pSdb->curConfig, sizeof(int64_t));
if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if (ret != sizeof(int64_t)) {
terrno = TSDB_CODE_FILE_CORRUPTED;
return -1;
}
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) { for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
int64_t maxId = 0; int64_t maxId = 0;
ret = taosReadFile(pFile, &maxId, sizeof(int64_t)); ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
...@@ -173,6 +183,11 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) { ...@@ -173,6 +183,11 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
return -1; return -1;
} }
if (taosWriteFile(pFile, &pSdb->curConfig, sizeof(int64_t)) != sizeof(int64_t)) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) { for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
int64_t maxId = 0; int64_t maxId = 0;
if (i < SDB_MAX) { if (i < SDB_MAX) {
...@@ -288,8 +303,8 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { ...@@ -288,8 +303,8 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
pSdb->lastCommitVer = pSdb->curVer; pSdb->lastCommitVer = pSdb->curVer;
pSdb->lastCommitTerm = pSdb->curTerm; pSdb->lastCommitTerm = pSdb->curTerm;
memcpy(pSdb->tableVer, tableVer, sizeof(tableVer)); memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
mDebug("read sdb file:%s successfully, ver:%" PRId64 " term:%" PRId64, file, pSdb->lastCommitVer, mDebug("read sdb file:%s successfully, index:%" PRId64 " term:%" PRId64 " config:%" PRId64, file, pSdb->lastCommitVer,
pSdb->lastCommitTerm); pSdb->lastCommitTerm, pSdb->curConfig);
_OVER: _OVER:
taosCloseFile(&pFile); taosCloseFile(&pFile);
...@@ -498,6 +513,7 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) { ...@@ -498,6 +513,7 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) {
taosThreadMutexLock(&pSdb->filelock); taosThreadMutexLock(&pSdb->filelock);
int64_t commitIndex = pSdb->lastCommitVer; int64_t commitIndex = pSdb->lastCommitVer;
int64_t commitTerm = pSdb->lastCommitTerm; int64_t commitTerm = pSdb->lastCommitTerm;
int64_t curConfig = pSdb->curConfig;
if (taosCopyFile(datafile, pIter->name) < 0) { if (taosCopyFile(datafile, pIter->name) < 0) {
taosThreadMutexUnlock(&pSdb->filelock); taosThreadMutexUnlock(&pSdb->filelock);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -516,8 +532,8 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) { ...@@ -516,8 +532,8 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) {
} }
*ppIter = pIter; *ppIter = pIter;
mInfo("sdbiter:%p, is created to read snapshot, index:%" PRId64 " term:%" PRId64 " file:%s", pIter, commitIndex, mInfo("sdbiter:%p, is created to read snapshot, index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s", pIter,
commitTerm, pIter->name); commitIndex, commitTerm, curConfig, pIter->name);
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册