未验证 提交 14e1e470 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #16993 from taosdata/feature/wal

feat(wal): auto fix corrupt file
......@@ -116,7 +116,6 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
}
#endif
}
// TODO truncate file
if (found == NULL) {
// file corrupted, no complete log
......@@ -125,8 +124,20 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
// truncate file
SWalCkHead* lastEntry = (SWalCkHead*)found;
int64_t retVer = lastEntry->head.version;
int64_t lastEntryBeginOffset = offset + (int64_t)((char*)found - (char*)buf);
int64_t lastEntryEndOffset = lastEntryBeginOffset + sizeof(SWalCkHead) + lastEntry->head.bodyLen;
if (lastEntryEndOffset != fileSize) {
wWarn("vgId:%d repair meta truncate file %s to %ld, orig size %ld", pWal->cfg.vgId, fnameStr, lastEntryEndOffset,
fileSize);
taosFtruncateFile(pFile, lastEntryEndOffset);
((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->fileSize = lastEntryEndOffset;
pWal->totSize -= (fileSize - lastEntryEndOffset);
}
taosCloseFile(&pFile);
taosMemoryFree(buf);
......@@ -226,16 +237,92 @@ int walCheckAndRepairMeta(SWal* pWal) {
}
}
// TODO: set fileSize and lastVer if necessary
return 0;
}
int walCheckAndRepairIdx(SWal* pWal) {
// TODO: iterate all log files
// if idx not found, scan log and write idx
// if found, check complete by first and last entry of each idx file
// if idx incomplete, binary search last valid entry, and then build other part
int32_t sz = taosArrayGetSize(pWal->fileInfoSet);
for (int32_t i = 0; i < sz; i++) {
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, i);
char fnameStr[WAL_FILE_LEN];
walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
int64_t fsize;
TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE);
if (pIdxFile == NULL) {
ASSERT(0);
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, cannot open file %s, since %s", pWal->cfg.vgId, fnameStr, terrstr());
return -1;
}
taosFStatFile(pIdxFile, &fsize, NULL);
if (fsize == (pFileInfo->lastVer - pFileInfo->firstVer + 1) * sizeof(SWalIdxEntry)) {
taosCloseFile(&pIdxFile);
continue;
}
int32_t left = fsize % sizeof(SWalIdxEntry);
int64_t offset = taosLSeekFile(pIdxFile, -left, SEEK_END);
if (left != 0) {
taosFtruncateFile(pIdxFile, offset);
wWarn("vgId:%d wal truncate file %s to offset %ld since size invalid, file size %ld", pWal->cfg.vgId, fnameStr,
offset, fsize);
}
offset -= sizeof(SWalIdxEntry);
SWalIdxEntry idxEntry = {.ver = pFileInfo->firstVer};
while (1) {
if (offset < 0) {
taosLSeekFile(pIdxFile, 0, SEEK_SET);
taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry));
break;
}
taosLSeekFile(pIdxFile, offset, SEEK_SET);
int64_t contLen = taosReadFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry));
if (contLen < 0 || contLen != sizeof(SWalIdxEntry)) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
if ((idxEntry.ver - pFileInfo->firstVer) * sizeof(SWalIdxEntry) != offset) {
taosFtruncateFile(pIdxFile, offset);
wWarn("vgId:%d wal truncate file %s to offset %ld since entry invalid, entry ver %ld, entry offset %ld",
pWal->cfg.vgId, fnameStr, offset, idxEntry.ver, idxEntry.offset);
offset -= sizeof(SWalIdxEntry);
} else {
break;
}
}
if (idxEntry.ver < pFileInfo->lastVer) {
char fLogNameStr[WAL_FILE_LEN];
walBuildLogName(pWal, pFileInfo->firstVer, fLogNameStr);
TdFilePtr pLogFile = taosOpenFile(fLogNameStr, TD_FILE_READ);
if (pLogFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, cannot open file %s, since %s", pWal->cfg.vgId, fLogNameStr, terrstr());
return -1;
}
while (idxEntry.ver < pFileInfo->lastVer) {
taosLSeekFile(pLogFile, idxEntry.offset, SEEK_SET);
SWalCkHead ckHead;
taosReadFile(pLogFile, &ckHead, sizeof(SWalCkHead));
if (idxEntry.ver != ckHead.head.version) {
// todo truncate this idx also
taosCloseFile(&pLogFile);
wError("vgId:%d, invalid repair case, log seek to %ld to find ver %ld, actual ver %ld", pWal->cfg.vgId,
idxEntry.offset, idxEntry.ver, ckHead.head.version);
return -1;
}
idxEntry.ver = ckHead.head.version + 1;
idxEntry.offset = idxEntry.offset + sizeof(SWalCkHead) + ckHead.head.bodyLen;
wWarn("vgId:%d wal idx append new entry %ld %ld", pWal->cfg.vgId, idxEntry.ver, idxEntry.offset);
taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry));
}
taosCloseFile(&pLogFile);
}
taosCloseFile(&pIdxFile);
}
return 0;
}
......
......@@ -149,15 +149,21 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
walLoadMeta(pWal);
if (walCheckAndRepairMeta(pWal) < 0) {
wError("vgId:%d cannot open wal since repair meta file failed", pWal->cfg.vgId);
taosHashCleanup(pWal->pRefHash);
taosRemoveRef(tsWal.refSetId, pWal->refId);
taosThreadMutexDestroy(&pWal->mutex);
taosArrayDestroy(pWal->fileInfoSet);
taosMemoryFree(pWal);
return NULL;
}
if (walCheckAndRepairIdx(pWal) < 0) {
wError("vgId:%d cannot open wal since repair idx file failed", pWal->cfg.vgId);
taosHashCleanup(pWal->pRefHash);
taosRemoveRef(tsWal.refSetId, pWal->refId);
taosThreadMutexDestroy(&pWal->mutex);
taosArrayDestroy(pWal->fileInfoSet);
return NULL;
}
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册