未验证 提交 a860e72f 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18701 from taosdata/FIX/TD-20390-3.0

fix: walSaveMeta ahead of deleting WAL files as gc in walEndSnapshot
...@@ -107,6 +107,8 @@ typedef struct SWal { ...@@ -107,6 +107,8 @@ typedef struct SWal {
TdFilePtr pIdxFile; TdFilePtr pIdxFile;
int32_t writeCur; int32_t writeCur;
SArray *fileInfoSet; // SArray<SWalFileInfo> SArray *fileInfoSet; // SArray<SWalFileInfo>
// gc
SArray *toDeleteFiles; // SArray<SWalFileInfo>
// status // status
int64_t totSize; int64_t totSize;
int64_t lastRollSeq; int64_t lastRollSeq;
......
...@@ -24,7 +24,9 @@ bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) { ...@@ -24,7 +24,9 @@ bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) {
return !walIsEmpty(pWal) && walGetFirstVer(pWal) <= ver && walGetLastVer(pWal) >= ver; return !walIsEmpty(pWal) && walGetFirstVer(pWal) <= ver && walGetLastVer(pWal) >= ver;
} }
bool FORCE_INLINE walIsEmpty(SWal* pWal) { return pWal->vers.firstVer == -1; } bool FORCE_INLINE walIsEmpty(SWal* pWal) {
return (pWal->vers.firstVer == -1 || pWal->vers.lastVer < pWal->vers.firstVer); // [firstVer, lastVer + 1)
}
int64_t FORCE_INLINE walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; } int64_t FORCE_INLINE walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; }
......
...@@ -121,7 +121,16 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -121,7 +121,16 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
pWal->writeCur = -1; pWal->writeCur = -1;
pWal->fileInfoSet = taosArrayInit(8, sizeof(SWalFileInfo)); pWal->fileInfoSet = taosArrayInit(8, sizeof(SWalFileInfo));
if (pWal->fileInfoSet == NULL) { if (pWal->fileInfoSet == NULL) {
wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno)); wError("vgId:%d, failed to init taosArray of fileInfoSet due to %s. path:%s", pWal->cfg.vgId, strerror(errno),
pWal->path);
goto _err;
}
// init gc
pWal->toDeleteFiles = taosArrayInit(8, sizeof(SWalFileInfo));
if (pWal->toDeleteFiles == NULL) {
wError("vgId:%d, failed to init taosArray of toDeleteFiles due to %s. path:%s", pWal->cfg.vgId, strerror(errno),
pWal->path);
goto _err; goto _err;
} }
...@@ -203,6 +212,8 @@ void walClose(SWal *pWal) { ...@@ -203,6 +212,8 @@ void walClose(SWal *pWal) {
pWal->pIdxFile = NULL; pWal->pIdxFile = NULL;
taosArrayDestroy(pWal->fileInfoSet); taosArrayDestroy(pWal->fileInfoSet);
pWal->fileInfoSet = NULL; pWal->fileInfoSet = NULL;
taosArrayDestroy(pWal->toDeleteFiles);
pWal->toDeleteFiles = NULL;
void *pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
......
...@@ -489,7 +489,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) { ...@@ -489,7 +489,7 @@ int32_t walReadVer(SWalReader *pReader, int64_t ver) {
int32_t code; int32_t code;
bool seeked = false; bool seeked = false;
if (pReader->pWal->vers.firstVer == -1) { if (walIsEmpty(pReader->pWal)) {
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1; return -1;
} }
......
...@@ -120,9 +120,9 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -120,9 +120,9 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
return -1; return -1;
} }
// delete files // delete files in descending order
int fileSetSize = taosArrayGetSize(pWal->fileInfoSet); int fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
for (int i = pWal->writeCur + 1; i < fileSetSize; i++) { for (int i = fileSetSize - 1; i >= pWal->writeCur + 1; i--) {
walBuildLogName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr); walBuildLogName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
wDebug("vgId:%d, wal remove file %s for rollback", pWal->cfg.vgId, fnameStr); wDebug("vgId:%d, wal remove file %s for rollback", pWal->cfg.vgId, fnameStr);
taosRemoveFile(fnameStr); taosRemoveFile(fnameStr);
...@@ -217,14 +217,9 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -217,14 +217,9 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
pWal->vers.lastVer = ver - 1; pWal->vers.lastVer = ver - 1;
if (pWal->vers.lastVer < pWal->vers.firstVer) { if (pWal->vers.lastVer < pWal->vers.firstVer) {
ASSERT(pWal->vers.lastVer == pWal->vers.firstVer - 1); ASSERT(pWal->vers.lastVer == pWal->vers.firstVer - 1);
pWal->vers.firstVer = -1;
} }
((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1; ((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1;
((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset; ((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset;
if (((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->lastVer < ver - 1) {
ASSERT(((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize == 0);
((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->firstVer = -1;
}
taosCloseFile(&pIdxFile); taosCloseFile(&pIdxFile);
taosCloseFile(&pLogFile); taosCloseFile(&pLogFile);
...@@ -338,6 +333,7 @@ int32_t walEndSnapshot(SWal *pWal) { ...@@ -338,6 +333,7 @@ int32_t walEndSnapshot(SWal *pWal) {
} else { } else {
wDebug("vgId:%d, wal no remove", pWal->cfg.vgId); wDebug("vgId:%d, wal no remove", pWal->cfg.vgId);
} }
// iterate files, until the searched result // iterate files, until the searched result
for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
wDebug("vgId:%d, wal check remove file %" PRId64 "(file size %" PRId64 " close ts %" PRId64 wDebug("vgId:%d, wal check remove file %" PRId64 "(file size %" PRId64 " close ts %" PRId64
...@@ -350,34 +346,17 @@ int32_t walEndSnapshot(SWal *pWal) { ...@@ -350,34 +346,17 @@ int32_t walEndSnapshot(SWal *pWal) {
wDebug("vgId:%d, check pass", pWal->cfg.vgId); wDebug("vgId:%d, check pass", pWal->cfg.vgId);
deleteCnt++; deleteCnt++;
newTotSize -= iter->fileSize; newTotSize -= iter->fileSize;
taosArrayPush(pWal->toDeleteFiles, iter);
} }
wDebug("vgId:%d, check not pass", pWal->cfg.vgId); wDebug("vgId:%d, check not pass", pWal->cfg.vgId);
} }
wDebug("vgId:%d, wal should delete %d files", pWal->cfg.vgId, deleteCnt);
int32_t actualDelete = 0;
char fnameStr[WAL_FILE_LEN];
// remove file
for (int i = 0; i < deleteCnt; i++) {
pInfo = taosArrayGet(pWal->fileInfoSet, i);
walBuildLogName(pWal, pInfo->firstVer, fnameStr);
wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr);
if (taosRemoveFile(fnameStr) < 0) {
goto UPDATE_META;
}
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr);
if (taosRemoveFile(fnameStr) < 0) {
ASSERT(0);
}
actualDelete++;
}
UPDATE_META: UPDATE_META:
// make new array, remove files // make new array, remove files
taosArrayPopFrontBatch(pWal->fileInfoSet, actualDelete); taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);
if (taosArrayGetSize(pWal->fileInfoSet) == 0) { if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
pWal->writeCur = -1; pWal->writeCur = -1;
pWal->vers.firstVer = -1; pWal->vers.firstVer = pWal->vers.lastVer + 1;
} else { } else {
pWal->vers.firstVer = ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; pWal->vers.firstVer = ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
} }
...@@ -392,6 +371,26 @@ int32_t walEndSnapshot(SWal *pWal) { ...@@ -392,6 +371,26 @@ int32_t walEndSnapshot(SWal *pWal) {
goto END; goto END;
} }
// delete files
deleteCnt = taosArrayGetSize(pWal->toDeleteFiles);
wDebug("vgId:%d, wal should delete %d files", pWal->cfg.vgId, deleteCnt);
char fnameStr[WAL_FILE_LEN];
for (int i = 0; i < deleteCnt; i++) {
pInfo = taosArrayGet(pWal->toDeleteFiles, i);
walBuildLogName(pWal, pInfo->firstVer, fnameStr);
wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr);
if (taosRemoveFile(fnameStr) < 0 && errno != ENOENT) {
wError("vgId:%d, failed to remove log file %s due to %s", pWal->cfg.vgId, fnameStr, strerror(errno));
goto END;
}
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr);
if (taosRemoveFile(fnameStr) < 0 && errno != ENOENT) {
ASSERT(0);
}
}
taosArrayClear(pWal->toDeleteFiles);
END: END:
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
return code; return code;
...@@ -489,9 +488,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy ...@@ -489,9 +488,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal); SWalFileInfo *pFileInfo = walGetCurFileInfo(pWal);
ASSERT(pFileInfo != NULL); ASSERT(pFileInfo != NULL);
if (pFileInfo->firstVer == -1) { ASSERT(pFileInfo->firstVer != -1);
pFileInfo->firstVer = index;
}
pWal->writeHead.head.version = index; pWal->writeHead.head.version = index;
pWal->writeHead.head.bodyLen = bodyLen; pWal->writeHead.head.bodyLen = bodyLen;
pWal->writeHead.head.msgType = msgType; pWal->writeHead.head.msgType = msgType;
...@@ -527,7 +524,10 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy ...@@ -527,7 +524,10 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
} }
// set status // set status
if (pWal->vers.firstVer == -1) pWal->vers.firstVer = index; if (pWal->vers.firstVer == -1) {
ASSERT(index == 0);
pWal->vers.firstVer = 0;
}
pWal->vers.lastVer = index; pWal->vers.lastVer = index;
pWal->totSize += sizeof(SWalCkHead) + bodyLen; pWal->totSize += sizeof(SWalCkHead) + bodyLen;
pFileInfo->lastVer = index; pFileInfo->lastVer = index;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册