diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index 09b65fa567a96be4978fc0a4b4f62b9e1da2de34..4440e3f0de5136566537905ec938951b709292da 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -189,6 +189,40 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) { return code; } +static void walFtruncate(SWal *pWal, int32_t fd, int64_t offset) { + taosFtruncate(fd, offset); + fsync(fd); +} + +static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int32_t fd, int64_t *offset) { + int64_t pos = *offset; + while (1) { + pos++; + + if (lseek(fd, pos, SEEK_SET) < 0) { + wError("vgId:%d, failed to seek from corrupted wal file since %s", pWal->vgId, strerror(errno)); + return TSDB_CODE_WAL_FILE_CORRUPTED; + } + + if (taosTRead(fd, pHead, sizeof(SWalHead)) <= 0) { + wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos); + return TSDB_CODE_WAL_FILE_CORRUPTED; + } + + if (pHead->signature != WAL_SIGNATURE) { + continue; + } + + if (taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { + wInfo("vgId:%d, wal head cksum check passed, offset:%" PRId64, pWal->vgId, pos); + *offset = pos; + return TSDB_CODE_SUCCESS; + } + } + + return TSDB_CODE_WAL_FILE_CORRUPTED; +} + static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name) { int32_t size = WAL_MAX_SIZE; void * buffer = tmalloc(size); @@ -222,16 +256,18 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch if (ret < sizeof(SWalHead)) { wError("vgId:%d, file:%s, failed to read wal head, ret is %d", pWal->vgId, name, ret); - taosFtruncate(fd, offset); - fsync(fd); + walFtruncate(pWal, fd, offset); break; } if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { - wError("vgId:%d, file:%s, wal head cksum is messed up, offset:%" PRId64, pWal->vgId, name, offset); - code = TSDB_CODE_WAL_FILE_CORRUPTED; - ASSERT(false); - break; + wError("vgId:%d, file:%s, wal head cksum is messed up, ver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, + pHead->version, pHead->len, offset); + code = walSkipCorruptedRecord(pWal, pHead, fd, &offset); + if (code != TSDB_CODE_SUCCESS) { + walFtruncate(pWal, fd, offset); + break; + } } if (pHead->len > size - sizeof(SWalHead)) { @@ -255,9 +291,8 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch if (ret < pHead->len) { wError("vgId:%d, file:%s, failed to read wal body, ret:%d len:%d", pWal->vgId, name, ret, pHead->len); - taosFtruncate(fd, offset); - fsync(fd); - break; + offset += sizeof(SWalHead); + continue; } offset = offset + sizeof(SWalHead) + pHead->len;