diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index cda7e35b0fa1c89b6d5b422193562d3ab777602c..a12f8051ba982ed627ed0767b76d344678748ca9 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -295,6 +295,36 @@ void walAlignVersions(SWal* pWal) { wInfo("vgId:%d, reset commitVer to %" PRId64, pWal->cfg.vgId, pWal->vers.commitVer); } +int walRepairLogFileTs(SWal* pWal, bool* updateMeta) { + int32_t sz = taosArrayGetSize(pWal->fileInfoSet); + int32_t fileIdx = -1; + int32_t lastCloseTs = 0; + char fnameStr[WAL_FILE_LEN] = {0}; + + while (++fileIdx < sz - 1) { + SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx); + if (pFileInfo->closeTs != -1) { + lastCloseTs = pFileInfo->closeTs; + continue; + } + + walBuildLogName(pWal, pFileInfo->firstVer, fnameStr); + int32_t mtime = 0; + if (taosStatFile(fnameStr, NULL, &mtime) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + wError("vgId:%d, failed to stat file due to %s, file:%s", pWal->cfg.vgId, strerror(errno), fnameStr); + return -1; + } + + if (updateMeta != NULL) *updateMeta = true; + if (pFileInfo->createTs == -1) pFileInfo->createTs = lastCloseTs; + pFileInfo->closeTs = mtime; + lastCloseTs = pFileInfo->closeTs; + } + + return 0; +} + bool walLogEntriesComplete(const SWal* pWal) { int32_t sz = taosArrayGetSize(pWal->fileInfoSet); bool complete = true; @@ -433,15 +463,8 @@ int walCheckAndRepairMeta(SWal* pWal) { wError("failed to scan wal last ver since %s", terrstr()); return -1; } - // remove the empty wal log, and its idx - wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr); - taosRemoveFile(fnameStr); - walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr); - wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr); - taosRemoveFile(fnameStr); - // remove its meta entry - taosArrayRemove(pWal->fileInfoSet, fileIdx); - continue; + // empty log file + lastVer = pFileInfo->firstVer - 1; } // update lastVer @@ -460,6 +483,11 @@ int walCheckAndRepairMeta(SWal* pWal) { } (void)walAlignVersions(pWal); + // repair ts of files + if (walRepairLogFileTs(pWal, &updateMeta) < 0) { + return -1; + } + // update meta file if (updateMeta) { (void)walSaveMeta(pWal); diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 848de4f36da4db57e881df4977b4f2399314a68f..9b7b3dfd5006857eb57b5e74edd39a3835fdbd71 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -284,15 +284,15 @@ int32_t walEndSnapshot(SWal *pWal) { if (ver == -1) { code = -1; goto END; - }; + } pWal->vers.snapshotVer = ver; int ts = taosGetTimestampSec(); - ver = TMAX(ver - pWal->vers.logRetention, pWal->vers.firstVer - 1); + // compatible mode for refVer bool hasTopic = false; - int64_t refVer = ver; + int64_t refVer = INT64_MAX; void *pIter = NULL; while (1) { pIter = taosHashIterate(pWal->pRefHash, pIter); @@ -300,54 +300,40 @@ int32_t walEndSnapshot(SWal *pWal) { SWalRef *pRef = *(SWalRef **)pIter; if (pRef->refVer == -1) continue; refVer = TMIN(refVer, pRef->refVer - 1); - wDebug("vgId:%d, wal found ref %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, pRef->refVer, pRef->refId); hasTopic = true; } - // compatible mode if (pWal->cfg.retentionPeriod == 0 && hasTopic) { + wInfo("vgId:%d, wal found refVer:%" PRId64 " in compatible mode, ver:%" PRId64, pWal->cfg.vgId, refVer, ver); ver = TMIN(ver, refVer); } + // find files safe to delete int deleteCnt = 0; int64_t newTotSize = pWal->totSize; - SWalFileInfo tmp; + SWalFileInfo tmp = {0}; tmp.firstVer = ver; - // find files safe to delete SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE); + if (pInfo) { - SWalFileInfo *pLastFileInfo = taosArrayGetLast(pWal->fileInfoSet); - wDebug("vgId:%d, wal search found file info: first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, pInfo->firstVer, - pInfo->lastVer); - if (ver >= pInfo->lastVer) { + wDebug("vgId:%d, wal search found file info. ver:%" PRId64 ", first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, ver, + pInfo->firstVer, pInfo->lastVer); + ASSERT(ver <= pInfo->lastVer); + if (ver == pInfo->lastVer) { pInfo++; - wDebug("vgId:%d, wal remove advance one file: first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, pInfo->firstVer, - pInfo->lastVer); - } - if (pInfo <= pLastFileInfo) { - wDebug("vgId:%d, wal end remove for first:%" PRId64 " last:%" PRId64, pWal->cfg.vgId, pInfo->firstVer, - pInfo->lastVer); - } else { - wDebug("vgId:%d, wal no remove", pWal->cfg.vgId); } // iterate files, until the searched result + // delete according to file size or close time for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { - wDebug("vgId:%d, wal check remove file %" PRId64 "(file size %" PRId64 " close ts %" PRId64 - "), new tot size %" PRId64, - pWal->cfg.vgId, iter->firstVer, iter->fileSize, iter->closeTs, newTotSize); - if ((pWal->cfg.retentionSize != -1 && pWal->cfg.retentionSize != 0 && newTotSize > pWal->cfg.retentionSize) || - ((pWal->cfg.retentionPeriod == 0) || (pWal->cfg.retentionPeriod != -1 && iter->closeTs != -1 && - iter->closeTs + pWal->cfg.retentionPeriod < ts))) { - // delete according to file size or close time - wDebug("vgId:%d, check pass", pWal->cfg.vgId); + if ((pWal->cfg.retentionSize > 0 && newTotSize > pWal->cfg.retentionSize) || + (pWal->cfg.retentionPeriod == 0 || + pWal->cfg.retentionPeriod > 0 && iter->closeTs >= 0 && iter->closeTs + pWal->cfg.retentionPeriod < ts)) { deleteCnt++; newTotSize -= iter->fileSize; taosArrayPush(pWal->toDeleteFiles, iter); } - wDebug("vgId:%d, check not pass", pWal->cfg.vgId); } - UPDATE_META: // make new array, remove files taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt); if (taosArrayGetSize(pWal->fileInfoSet) == 0) { @@ -357,11 +343,12 @@ int32_t walEndSnapshot(SWal *pWal) { pWal->vers.firstVer = ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; } } + + // update meta pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1; pWal->totSize = newTotSize; pWal->vers.verInSnapshotting = -1; - // save snapshot ver, commit ver code = walSaveMeta(pWal); if (code < 0) { goto END; @@ -369,23 +356,27 @@ int32_t walEndSnapshot(SWal *pWal) { // delete files deleteCnt = taosArrayGetSize(pWal->toDeleteFiles); - wDebug("vgId:%d, wal should delete %d files", pWal->cfg.vgId, deleteCnt); - char fnameStr[WAL_FILE_LEN]; + char fnameStr[WAL_FILE_LEN] = {0}; + pInfo = NULL; + for (int i = 0; i < deleteCnt; i++) { pInfo = taosArrayGet(pWal->toDeleteFiles, i); + walBuildLogName(pWal, pInfo->firstVer, fnameStr); - wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr); if (taosRemoveFile(fnameStr) < 0 && errno != ENOENT) { wError("vgId:%d, failed to remove log file %s due to %s", pWal->cfg.vgId, fnameStr, strerror(errno)); goto END; } walBuildIdxName(pWal, pInfo->firstVer, fnameStr); - wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr); if (taosRemoveFile(fnameStr) < 0 && errno != ENOENT) { wError("vgId:%d, failed to remove idx file %s due to %s", pWal->cfg.vgId, fnameStr, strerror(errno)); goto END; } } + if (pInfo != NULL) { + wInfo("vgId:%d, wal log files recycled. count:%d, until ver:%" PRId64 ", closeTs:%" PRId64, pWal->cfg.vgId, + deleteCnt, pInfo->lastVer, pInfo->closeTs); + } taosArrayClear(pWal->toDeleteFiles); END: