未验证 提交 1c46b8d0 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #13946 from taosdata/fix/tsim

refactor: sdb commit index
...@@ -169,11 +169,12 @@ typedef struct SSdb { ...@@ -169,11 +169,12 @@ typedef struct SSdb {
SWal *pWal; SWal *pWal;
char *currDir; char *currDir;
char *tmpDir; char *tmpDir;
int64_t lastCommitVer; int64_t commitIndex;
int64_t lastCommitTerm; int64_t commitTerm;
int64_t curVer; int64_t commitConfig;
int64_t curTerm; int64_t applyIndex;
int64_t curConfig; int64_t applyTerm;
int64_t applyConfig;
int64_t tableVer[SDB_MAX]; int64_t tableVer[SDB_MAX];
int64_t maxId[SDB_MAX]; int64_t maxId[SDB_MAX];
EKeyType keyTypes[SDB_MAX]; EKeyType keyTypes[SDB_MAX];
......
...@@ -53,11 +53,12 @@ SSdb *sdbInit(SSdbOpt *pOption) { ...@@ -53,11 +53,12 @@ SSdb *sdbInit(SSdbOpt *pOption) {
} }
pSdb->pWal = pOption->pWal; pSdb->pWal = pOption->pWal;
pSdb->curVer = -1; pSdb->applyIndex = -1;
pSdb->curTerm = -1; pSdb->applyTerm = -1;
pSdb->lastCommitVer = -1; pSdb->applyConfig = -1;
pSdb->lastCommitTerm = -1; pSdb->commitIndex = -1;
pSdb->curConfig = -1; pSdb->commitTerm = -1;
pSdb->commitConfig = -1;
pSdb->pMnode = pOption->pMnode; pSdb->pMnode = pOption->pMnode;
taosThreadMutexInit(&pSdb->filelock, NULL); taosThreadMutexInit(&pSdb->filelock, NULL);
mDebug("sdb init successfully"); mDebug("sdb init successfully");
...@@ -159,23 +160,23 @@ static int32_t sdbCreateDir(SSdb *pSdb) { ...@@ -159,23 +160,23 @@ static int32_t sdbCreateDir(SSdb *pSdb) {
return 0; return 0;
} }
void sdbSetApplyIndex(SSdb *pSdb, int64_t index) { pSdb->curVer = index; } void sdbSetApplyIndex(SSdb *pSdb, int64_t index) { pSdb->applyIndex = index; }
void sdbSetApplyTerm(SSdb *pSdb, int64_t term) { pSdb->curTerm = term; } void sdbSetApplyTerm(SSdb *pSdb, int64_t term) { pSdb->applyTerm = term; }
void sdbSetCurConfig(SSdb *pSdb, int64_t config) { void sdbSetCurConfig(SSdb *pSdb, int64_t config) {
if (pSdb->curConfig != config) { if (pSdb->applyConfig != config) {
mDebug("mnode sync config set from %" PRId64 " to %" PRId64, pSdb->curConfig, config); mDebug("mnode sync config set from %" PRId64 " to %" PRId64, pSdb->applyConfig, config);
pSdb->curConfig = config; pSdb->applyConfig = config;
} }
} }
int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->curVer; } int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->applyIndex; }
int64_t sdbGetApplyTerm(SSdb *pSdb) { return pSdb->curTerm; } int64_t sdbGetApplyTerm(SSdb *pSdb) { return pSdb->applyTerm; }
int64_t sdbGetCommitIndex(SSdb *pSdb) { return pSdb->lastCommitVer; } int64_t sdbGetCommitIndex(SSdb *pSdb) { return pSdb->commitIndex; }
int64_t sdbGetCommitTerm(SSdb *pSdb) { return pSdb->lastCommitTerm; } int64_t sdbGetCommitTerm(SSdb *pSdb) { return pSdb->commitTerm; }
int64_t sdbGetCurConfig(SSdb *pSdb) { return pSdb->curConfig; } int64_t sdbGetCurConfig(SSdb *pSdb) { return pSdb->commitConfig; }
\ No newline at end of file \ No newline at end of file
...@@ -67,10 +67,12 @@ static void sdbResetData(SSdb *pSdb) { ...@@ -67,10 +67,12 @@ static void sdbResetData(SSdb *pSdb) {
mDebug("sdb:%s is reset", sdbTableName(i)); mDebug("sdb:%s is reset", sdbTableName(i));
} }
pSdb->curVer = -1; pSdb->applyIndex = -1;
pSdb->curTerm = -1; pSdb->applyTerm = -1;
pSdb->lastCommitVer = -1; pSdb->applyConfig = -1;
pSdb->lastCommitTerm = -1; pSdb->commitIndex = -1;
pSdb->commitTerm = -1;
pSdb->commitConfig = -1;
mDebug("sdb reset successfully"); mDebug("sdb reset successfully");
} }
...@@ -90,7 +92,7 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) { ...@@ -90,7 +92,7 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
return -1; return -1;
} }
ret = taosReadFile(pFile, &pSdb->curVer, sizeof(int64_t)); ret = taosReadFile(pFile, &pSdb->applyIndex, sizeof(int64_t));
if (ret < 0) { if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
...@@ -100,7 +102,7 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) { ...@@ -100,7 +102,7 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
return -1; return -1;
} }
ret = taosReadFile(pFile, &pSdb->curTerm, sizeof(int64_t)); ret = taosReadFile(pFile, &pSdb->applyTerm, sizeof(int64_t));
if (ret < 0) { if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
...@@ -110,7 +112,7 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) { ...@@ -110,7 +112,7 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
return -1; return -1;
} }
ret = taosReadFile(pFile, &pSdb->curConfig, sizeof(int64_t)); ret = taosReadFile(pFile, &pSdb->applyConfig, sizeof(int64_t));
if (ret < 0) { if (ret < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
...@@ -173,17 +175,17 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) { ...@@ -173,17 +175,17 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
return -1; return -1;
} }
if (taosWriteFile(pFile, &pSdb->curVer, sizeof(int64_t)) != sizeof(int64_t)) { if (taosWriteFile(pFile, &pSdb->applyIndex, sizeof(int64_t)) != sizeof(int64_t)) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
if (taosWriteFile(pFile, &pSdb->curTerm, sizeof(int64_t)) != sizeof(int64_t)) { if (taosWriteFile(pFile, &pSdb->applyTerm, sizeof(int64_t)) != sizeof(int64_t)) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
if (taosWriteFile(pFile, &pSdb->curConfig, sizeof(int64_t)) != sizeof(int64_t)) { if (taosWriteFile(pFile, &pSdb->applyConfig, sizeof(int64_t)) != sizeof(int64_t)) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
...@@ -300,11 +302,12 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { ...@@ -300,11 +302,12 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
} }
code = 0; code = 0;
pSdb->lastCommitVer = pSdb->curVer; pSdb->commitIndex = pSdb->applyIndex;
pSdb->lastCommitTerm = pSdb->curTerm; pSdb->commitTerm = pSdb->applyTerm;
pSdb->commitConfig = pSdb->applyConfig;
memcpy(pSdb->tableVer, tableVer, sizeof(tableVer)); memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
mDebug("read sdb file:%s successfully, index:%" PRId64 " term:%" PRId64 " config:%" PRId64, file, pSdb->lastCommitVer, mDebug("read sdb file:%s successfully, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64, file,
pSdb->lastCommitTerm, pSdb->curConfig); pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig);
_OVER: _OVER:
taosCloseFile(&pFile); taosCloseFile(&pFile);
...@@ -336,9 +339,10 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { ...@@ -336,9 +339,10 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
char curfile[PATH_MAX] = {0}; char curfile[PATH_MAX] = {0};
snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
mDebug("start to write sdb file, current ver:%" PRId64 " term:%" PRId64 ", commit ver:%" PRId64 " term:%" PRId64 mDebug("start to write sdb file, apply index:%" PRId64 " term:%" PRId64 " config:%" PRId64 ", commit index:%" PRId64
" file:%s", " term:%" PRId64 " config:%" PRId64 ", file:%s",
pSdb->curVer, pSdb->curTerm, pSdb->lastCommitVer, pSdb->lastCommitTerm, curfile); pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig,
curfile);
TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) { if (pFile == NULL) {
...@@ -430,10 +434,11 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { ...@@ -430,10 +434,11 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
if (code != 0) { if (code != 0) {
mError("failed to write sdb file:%s since %s", curfile, tstrerror(code)); mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
} else { } else {
pSdb->lastCommitVer = pSdb->curVer; pSdb->commitIndex = pSdb->applyIndex;
pSdb->lastCommitTerm = pSdb->curTerm; pSdb->commitTerm = pSdb->applyTerm;
mDebug("write sdb file successfully, index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s", pSdb->commitConfig = pSdb->applyConfig;
pSdb->lastCommitVer, pSdb->lastCommitTerm, pSdb->curConfig, curfile); mDebug("write sdb file successfully, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig, curfile);
} }
terrno = code; terrno = code;
...@@ -442,13 +447,13 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { ...@@ -442,13 +447,13 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
int32_t sdbWriteFile(SSdb *pSdb) { int32_t sdbWriteFile(SSdb *pSdb) {
int32_t code = 0; int32_t code = 0;
if (pSdb->curVer == pSdb->lastCommitVer) { if (pSdb->applyIndex == pSdb->commitIndex) {
return 0; return 0;
} }
taosThreadMutexLock(&pSdb->filelock); taosThreadMutexLock(&pSdb->filelock);
if (pSdb->pWal != NULL) { if (pSdb->pWal != NULL) {
code = walBeginSnapshot(pSdb->pWal, pSdb->curVer); code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex);
} }
if (code == 0) { if (code == 0) {
code = sdbWriteFileImp(pSdb); code = sdbWriteFileImp(pSdb);
...@@ -522,9 +527,9 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) { ...@@ -522,9 +527,9 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) {
snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
taosThreadMutexLock(&pSdb->filelock); taosThreadMutexLock(&pSdb->filelock);
int64_t commitIndex = pSdb->lastCommitVer; int64_t commitIndex = pSdb->commitIndex;
int64_t commitTerm = pSdb->lastCommitTerm; int64_t commitTerm = pSdb->commitTerm;
int64_t curConfig = pSdb->curConfig; int64_t commitConfig = pSdb->commitConfig;
if (taosCopyFile(datafile, pIter->name) < 0) { if (taosCopyFile(datafile, pIter->name) < 0) {
taosThreadMutexUnlock(&pSdb->filelock); taosThreadMutexUnlock(&pSdb->filelock);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -543,8 +548,8 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) { ...@@ -543,8 +548,8 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) {
} }
*ppIter = pIter; *ppIter = pIter;
mInfo("sdbiter:%p, is created to read snapshot, index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s", pIter, mInfo("sdbiter:%p, is created to read snapshot, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
commitIndex, commitTerm, curConfig, pIter->name); pIter, commitIndex, commitTerm, commitConfig, pIter->name);
return 0; return 0;
} }
......
...@@ -295,8 +295,9 @@ void dumpTrans(SSdb *pSdb, SJson *json) { ...@@ -295,8 +295,9 @@ void dumpTrans(SSdb *pSdb, SJson *json) {
void dumpHeader(SSdb *pSdb, SJson *json) { void dumpHeader(SSdb *pSdb, SJson *json) {
tjsonAddIntegerToObject(json, "sver", 1); tjsonAddIntegerToObject(json, "sver", 1);
tjsonAddStringToObject(json, "curVer", i642str(pSdb->curVer)); tjsonAddStringToObject(json, "applyIndex", i642str(pSdb->applyIndex));
tjsonAddStringToObject(json, "curTerm", i642str(pSdb->curTerm)); tjsonAddStringToObject(json, "applyTerm", i642str(pSdb->applyTerm));
tjsonAddStringToObject(json, "applyConfig", i642str(pSdb->applyConfig));
SJson *maxIdsJson = tjsonCreateObject(); SJson *maxIdsJson = tjsonCreateObject();
tjsonAddItemToObject(json, "maxIds", maxIdsJson); tjsonAddItemToObject(json, "maxIds", maxIdsJson);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册