diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 5284aeff779f6b26f6f5a39aba61c781264108fd..c69046f707a1fddb7a593771ad15535a70615ff8 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -116,7 +116,6 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { } #endif } - // TODO truncate file if (found == NULL) { // file corrupted, no complete log @@ -125,8 +124,20 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } + + // truncate file SWalCkHead* lastEntry = (SWalCkHead*)found; int64_t retVer = lastEntry->head.version; + int64_t lastEntryBeginOffset = offset + (int64_t)((char*)found - (char*)buf); + int64_t lastEntryEndOffset = lastEntryBeginOffset + sizeof(SWalCkHead) + lastEntry->head.bodyLen; + if (lastEntryEndOffset != fileSize) { + wWarn("vgId:%d repair meta truncate file %s to %ld, orig size %ld", pWal->cfg.vgId, fnameStr, lastEntryEndOffset, + fileSize); + taosFtruncateFile(pFile, lastEntryEndOffset); + ((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->fileSize = lastEntryEndOffset; + pWal->totSize -= (fileSize - lastEntryEndOffset); + } + taosCloseFile(&pFile); taosMemoryFree(buf); @@ -226,16 +237,92 @@ int walCheckAndRepairMeta(SWal* pWal) { } } - // TODO: set fileSize and lastVer if necessary - return 0; } int walCheckAndRepairIdx(SWal* pWal) { - // TODO: iterate all log files - // if idx not found, scan log and write idx - // if found, check complete by first and last entry of each idx file - // if idx incomplete, binary search last valid entry, and then build other part + int32_t sz = taosArrayGetSize(pWal->fileInfoSet); + for (int32_t i = 0; i < sz; i++) { + SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, i); + + char fnameStr[WAL_FILE_LEN]; + walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr); + int64_t fsize; + TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE); + if (pIdxFile == NULL) { + ASSERT(0); + terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, cannot open file %s, since %s", pWal->cfg.vgId, fnameStr, terrstr()); + return -1; + } + + taosFStatFile(pIdxFile, &fsize, NULL); + if (fsize == (pFileInfo->lastVer - pFileInfo->firstVer + 1) * sizeof(SWalIdxEntry)) { + taosCloseFile(&pIdxFile); + continue; + } + + int32_t left = fsize % sizeof(SWalIdxEntry); + int64_t offset = taosLSeekFile(pIdxFile, -left, SEEK_END); + if (left != 0) { + taosFtruncateFile(pIdxFile, offset); + wWarn("vgId:%d wal truncate file %s to offset %ld since size invalid, file size %ld", pWal->cfg.vgId, fnameStr, + offset, fsize); + } + offset -= sizeof(SWalIdxEntry); + + SWalIdxEntry idxEntry = {.ver = pFileInfo->firstVer}; + while (1) { + if (offset < 0) { + taosLSeekFile(pIdxFile, 0, SEEK_SET); + taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)); + break; + } + taosLSeekFile(pIdxFile, offset, SEEK_SET); + int64_t contLen = taosReadFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)); + if (contLen < 0 || contLen != sizeof(SWalIdxEntry)) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + if ((idxEntry.ver - pFileInfo->firstVer) * sizeof(SWalIdxEntry) != offset) { + taosFtruncateFile(pIdxFile, offset); + wWarn("vgId:%d wal truncate file %s to offset %ld since entry invalid, entry ver %ld, entry offset %ld", + pWal->cfg.vgId, fnameStr, offset, idxEntry.ver, idxEntry.offset); + offset -= sizeof(SWalIdxEntry); + } else { + break; + } + } + + if (idxEntry.ver < pFileInfo->lastVer) { + char fLogNameStr[WAL_FILE_LEN]; + walBuildLogName(pWal, pFileInfo->firstVer, fLogNameStr); + TdFilePtr pLogFile = taosOpenFile(fLogNameStr, TD_FILE_READ); + if (pLogFile == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, cannot open file %s, since %s", pWal->cfg.vgId, fLogNameStr, terrstr()); + return -1; + } + while (idxEntry.ver < pFileInfo->lastVer) { + taosLSeekFile(pLogFile, idxEntry.offset, SEEK_SET); + SWalCkHead ckHead; + taosReadFile(pLogFile, &ckHead, sizeof(SWalCkHead)); + if (idxEntry.ver != ckHead.head.version) { + // todo truncate this idx also + taosCloseFile(&pLogFile); + wError("vgId:%d, invalid repair case, log seek to %ld to find ver %ld, actual ver %ld", pWal->cfg.vgId, + idxEntry.offset, idxEntry.ver, ckHead.head.version); + return -1; + } + idxEntry.ver = ckHead.head.version + 1; + idxEntry.offset = idxEntry.offset + sizeof(SWalCkHead) + ckHead.head.bodyLen; + wWarn("vgId:%d wal idx append new entry %ld %ld", pWal->cfg.vgId, idxEntry.ver, idxEntry.offset); + taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)); + } + taosCloseFile(&pLogFile); + } + taosCloseFile(&pIdxFile); + } return 0; } diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index c939c8c43685ef3dc6ea0b4fde1cd5fbfb33a8d1..a55f00d27702294f6bf996690c80ca5e3765428a 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -149,15 +149,21 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { walLoadMeta(pWal); if (walCheckAndRepairMeta(pWal) < 0) { + wError("vgId:%d cannot open wal since repair meta file failed", pWal->cfg.vgId); taosHashCleanup(pWal->pRefHash); taosRemoveRef(tsWal.refSetId, pWal->refId); taosThreadMutexDestroy(&pWal->mutex); taosArrayDestroy(pWal->fileInfoSet); - taosMemoryFree(pWal); return NULL; } if (walCheckAndRepairIdx(pWal) < 0) { + wError("vgId:%d cannot open wal since repair idx file failed", pWal->cfg.vgId); + taosHashCleanup(pWal->pRefHash); + taosRemoveRef(tsWal.refSetId, pWal->refId); + taosThreadMutexDestroy(&pWal->mutex); + taosArrayDestroy(pWal->fileInfoSet); + return NULL; } wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level,