From e535e660d255c509345c79539009e9311795903a Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Thu, 3 Nov 2022 15:34:29 +0800 Subject: [PATCH] fix(wal): wal corrupt and repaire --- src/inc/twal.h | 6 ++ src/sync/src/syncRetrieve.c | 88 ++++++++++++++++++++++-- src/util/inc/tfile.h | 1 + src/util/src/tfile.c | 7 ++ src/wal/inc/walInt.h | 2 - src/wal/src/walWrite.c | 129 +++++++++++++++++++----------------- 6 files changed, 162 insertions(+), 71 deletions(-) diff --git a/src/inc/twal.h b/src/inc/twal.h index daea34daed..f3f7972813 100644 --- a/src/inc/twal.h +++ b/src/inc/twal.h @@ -19,6 +19,9 @@ extern "C" { #endif +#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE)) +#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16) + typedef enum { TAOS_WAL_NOLOG = 0, TAOS_WAL_WRITE = 1, @@ -68,6 +71,9 @@ uint64_t walGetVersion(twalh); void walResetVersion(twalh, uint64_t newVer); int64_t walGetFSize(twalh); +// sync read wal interface +int walValidateChecksum(SWalHead *pHead); + #ifdef __cplusplus } #endif diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index f0fcf6d6dd..b2883924bd 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -134,8 +134,56 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { return 0; } +static int32_t syncSkipCorruptedRecord(SWalHead *pHead, int32_t fd) { + int64_t pos = taosLSeek(fd, 0, SEEK_CUR); + if (pos < 0) { + wError("fd:%d, skip corrupte taosLSeek return error. pos=%" PRId64, fd, pos); + return TSDB_CODE_WAL_FILE_CORRUPTED; + } + // save start bad bytes positioin + int64_t start = pos; + while (1) { + pos++; + if (taosLSeek(fd, pos, SEEK_SET) < 0) { + sError("fd:%d, failed to seek from corrupted wal file pos=%" PRId64 ".since %s", fd, pos, strerror(errno)); + return TSDB_CODE_WAL_FILE_CORRUPTED; + } + + if (fdRead(fd, pHead, sizeof(SWalHead)) <= 0) { + sError("fd:%d, failed to read wal head from corrupted wal. pos=%" PRId64 ".since %s", fd, pos, strerror(errno)); + return TSDB_CODE_WAL_FILE_CORRUPTED; + } + + if (pHead->signature != WAL_SIGNATURE) { + continue; + } + + if (pHead->sver == 0) { + // old format wal, only check head crc + if (walValidateChecksum(pHead)) { + sInfo("fd:%d, old wal read head ok, pHead->len=%d, skip bad bytes=%" PRId64 " right pos:%" PRId64, fd, pHead->len, pos - start, pos); + return TSDB_CODE_SUCCESS; + } + } else { + // new format wal, check head + body crc + if (fdRead(fd, pHead->cont, pHead->len) < pHead->len) { + sError("fd:%d, read to end of corrupted wal file, offset:%" PRId64, fd, pos); + return TSDB_CODE_WAL_FILE_CORRUPTED; + } + + if (walValidateChecksum(pHead)) { + sInfo("fd:%d, wal read head ok, pHead->len=%d, skip bad bytes=%" PRId64 " right pos:%" PRId64, fd, pHead->len, pos - start, pos); + return TSDB_CODE_SUCCESS; + } + } + } + + return TSDB_CODE_WAL_FILE_CORRUPTED; +} + // if only a partial record is read out, upper layer will reload the file to get a complete record static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) { + int32_t code = TSDB_CODE_SUCCESS; int32_t ret = read(sfd, pHead, sizeof(SWalHead)); if (ret < 0) { sError("sfd:%d, failed to read wal head since %s, ret:%d", sfd, strerror(errno), ret); @@ -153,18 +201,44 @@ static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) { return 0; } - assert(pHead->len <= TSDB_MAX_WAL_SIZE); + // check wal head valid + if (pHead->sver == 0 && !walValidateChecksum(pHead)) { + sError("sfd:%d, old wal head cksum is messed up, sver=%d version:%" PRIu64 " len:%d", sfd, pHead->sver, pHead->version, pHead->len); + code = syncSkipCorruptedRecord(pHead, sfd); + if (code != TSDB_CODE_SUCCESS) { + sError("sfd:%d, wal corrupted and skip failed crc check, code:%d", sfd, code); + return -1; + } + } + + if (pHead->len < 0 || pHead->len > WAL_MAX_SIZE - sizeof(SWalHead)) { + sError("sfd:%d, wal head len out of range, hver:%" PRIu64 " len:%d", sfd, pHead->version, pHead->len); + code = syncSkipCorruptedRecord(pHead, sfd); + if (code != TSDB_CODE_SUCCESS) { + sError("sfd:%d, wal corrupted and skip failed length check, code:%d", sfd, code); + return -1; + } + } - ret = read(sfd, pHead->cont, pHead->len); + // read body + ret = (int32_t)read(sfd, pHead->cont, pHead->len); if (ret < 0) { - sError("sfd:%d, failed to read wal content since %s, ret:%d", sfd, strerror(errno), ret); + sError("sfd:%d, wal read wal cont failed. read len=%d, ret:%d", sfd, pHead->len, ret); return -1; } - if (ret != pHead->len) { - // file is not at end yet, it shall be reloaded - sInfo("sfd:%d, a partial wal conetnt is read out, ret:%d", sfd, ret); - return 0; + if (ret < pHead->len) { + sError("sfd:%d, wal read wal cont length small. need read len=%d, ret len:%d", sfd, pHead->len, ret); + return -1; + } + + if (pHead->sver != 0 && !walValidateChecksum(pHead)) { + sError("sfd:%d, wal check sum failed, sver=%d version:%" PRIu64 " len:%d", sfd, pHead->sver, pHead->version, pHead->len); + code = syncSkipCorruptedRecord(pHead, sfd); + if (code != TSDB_CODE_SUCCESS) { + sError("sfd:%d, wal read body check sum not right and skip corrupted failed, code:%d", sfd, code); + return -1; + } } return sizeof(SWalHead) + pHead->len; diff --git a/src/util/inc/tfile.h b/src/util/inc/tfile.h index 11a04cdf94..62ca627163 100644 --- a/src/util/inc/tfile.h +++ b/src/util/inc/tfile.h @@ -33,6 +33,7 @@ int64_t tfOpenM(const char *pathname, int32_t flags, mode_t mode); int64_t tfClose(int64_t tfd); int64_t tfWrite(int64_t tfd, void *buf, int64_t count); int64_t tfRead(int64_t tfd, void *buf, int64_t count); +int64_t fdRead(int32_t fd, void *buf, int64_t count); int32_t tfFsync(int64_t tfd); bool tfValid(int64_t tfd); int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence); diff --git a/src/util/src/tfile.c b/src/util/src/tfile.c index d975995b21..4e41e259af 100644 --- a/src/util/src/tfile.c +++ b/src/util/src/tfile.c @@ -93,6 +93,13 @@ int64_t tfRead(int64_t tfd, void *buf, int64_t count) { return ret; } +int64_t fdRead(int32_t fd, void *buf, int64_t count) { + int64_t ret = taosRead(fd, buf, count); + if (ret < 0) terrno = TAOS_SYSTEM_ERROR(errno); + return ret; +} + + int32_t tfFsync(int64_t tfd) { void *p = taosAcquireRef(tsFileRsetId, tfd); if (p == NULL) return -1; diff --git a/src/wal/inc/walInt.h b/src/wal/inc/walInt.h index 890b404ce9..3062b44dfd 100644 --- a/src/wal/inc/walInt.h +++ b/src/wal/inc/walInt.h @@ -34,8 +34,6 @@ extern int32_t wDebugFlag; #define WAL_PREFIX "wal" #define WAL_PREFIX_LEN 3 #define WAL_REFRESH_MS 1000 -#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16) -#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE)) #define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) #define WAL_FILE_LEN (WAL_PATH_LEN + 32) #define WAL_FILE_NUM 1 // 3 diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index 3f2df3f624..b64f81a82c 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -112,7 +112,7 @@ void walRemoveAllOldFiles(void *handle) { pthread_mutex_unlock(&pWal->mutex); } -#if defined(WAL_CHECKSUM_WHOLE) + static void walUpdateChecksum(SWalHead *pHead) { pHead->sver = 2; @@ -123,17 +123,16 @@ static void walUpdateChecksum(SWalHead *pHead) { static int walValidateChecksum(SWalHead *pHead) { if (pHead->sver == 0) { // for compatible with wal before sver 1 return taosCheckChecksumWhole((uint8_t *)pHead, sizeof(*pHead)); - } else if (pHead->sver >= 1) { + } else { + // new wal format uint32_t cksum = pHead->cksum; pHead->cksum = 0; - return taosCheckChecksum((uint8_t *)pHead, sizeof(*pHead) + pHead->len, cksum); + int ret = taosCheckChecksum((uint8_t *)pHead, sizeof(*pHead) + pHead->len, cksum); + pHead->cksum = cksum; // must restore cksum for next call walValiteCheckSum + return ret; } - - return 0; } -#endif - int32_t walWrite(void *handle, SWalHead *pHead) { if (handle == NULL) return -1; @@ -275,7 +274,52 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, continue; } -#if defined(WAL_CHECKSUM_WHOLE) + if (pHead->sver == 0) { + // old wal file format, only check head data crc + if (walValidateChecksum(pHead)) { + wInfo("vgId:%d, wal head cksum check passed, offset:%" PRId64, pWal->vgId, pos); + *offset = pos; + return TSDB_CODE_SUCCESS; + } + } else { + // maybe new wal file format, read body data and check head + body crc + if (tfRead(tfd, pHead->cont, pHead->len) < pHead->len) { + wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos); + return TSDB_CODE_WAL_FILE_CORRUPTED; + } + + // check head + body crc + if (walValidateChecksum(pHead)) { + wInfo("vgId:%d, wal whole cksum check passed, offset:%" PRId64, pWal->vgId, pos); + *offset = pos; + return TSDB_CODE_SUCCESS; + } + } + } + + return TSDB_CODE_WAL_FILE_CORRUPTED; +} + +static int32_t walSkipOldCorrupted(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) { + int64_t pos = *offset; + while (1) { + pos++; + + if (tfLseek(tfd, pos, SEEK_SET) < 0) { + wError("vgId:%d, failed to seek from corrupted wal file since %s", pWal->vgId, strerror(errno)); + return TSDB_CODE_WAL_FILE_CORRUPTED; + } + + if (tfRead(tfd, pHead, sizeof(SWalHead)) <= 0) { + wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos); + return TSDB_CODE_WAL_FILE_CORRUPTED; + } + + if (pHead->signature != WAL_SIGNATURE) { + continue; + } + + if (pHead->sver == 0 && walValidateChecksum(pHead)) { wInfo("vgId:%d, wal head cksum check passed, offset:%" PRId64, pWal->vgId, pos); *offset = pos; @@ -284,29 +328,23 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, if (pHead->sver >= 1) { if (tfRead(tfd, pHead->cont, pHead->len) < pHead->len) { - wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos); - return TSDB_CODE_WAL_FILE_CORRUPTED; + wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos); + return TSDB_CODE_WAL_FILE_CORRUPTED; } if (walValidateChecksum(pHead)) { - wInfo("vgId:%d, wal whole cksum check passed, offset:%" PRId64, pWal->vgId, pos); - *offset = pos; - return TSDB_CODE_SUCCESS; + wInfo("vgId:%d, wal whole cksum check passed, offset:%" PRId64, pWal->vgId, pos); + *offset = pos; + return TSDB_CODE_SUCCESS; } } - -#else - if (taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { - wInfo("vgId:%d, wal head cksum check passed, offset:%" PRId64, pWal->vgId, pos); - *offset = pos; - return TSDB_CODE_SUCCESS; - } - -#endif } return TSDB_CODE_WAL_FILE_CORRUPTED; } + + + // Add SMemRowType ahead of SDataRow static void expandSubmitBlk(SSubmitBlk *pDest, SSubmitBlk *pSrc, int32_t *lenExpand) { // copy the header firstly @@ -454,9 +492,9 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch break; } -#if defined(WAL_CHECKSUM_WHOLE) - if ((pHead->sver == 0 && !walValidateChecksum(pHead)) || pHead->sver < 0 || pHead->sver > 2) { - wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, + // sver == 0 is old wal format, other is new wal + if (pHead->sver == 0 && !walValidateChecksum(pHead)) { + wError("vgId:%d, file:%s, old wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, pHead->version, pHead->len, offset); code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); if (code != TSDB_CODE_SUCCESS) { @@ -465,7 +503,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch } } - if (pHead->len < 0 || pHead->len > size - sizeof(SWalHead)) { + if ( pHead->sver == 0 && (pHead->len < 0 || pHead->len > size - sizeof(SWalHead))) { wError("vgId:%d, file:%s, wal head len out of range, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, pHead->version, pHead->len, offset); code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); @@ -488,7 +526,9 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch continue; } - if ((pHead->sver >= 1) && !walValidateChecksum(pHead)) { + // check new wal sum head + body crc + if ((pHead->sver != 0) && !walValidateChecksum(pHead)) { + // new format wal corrupted wError("vgId:%d, file:%s, wal whole cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, pHead->version, pHead->len, offset); code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); @@ -498,41 +538,6 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch } } -#else - if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { - wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, - pHead->version, pHead->len, offset); - code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); - if (code != TSDB_CODE_SUCCESS) { - walFtruncate(pWal, tfd, offset); - break; - } - } - - if (pHead->len < 0 || pHead->len > size - sizeof(SWalHead)) { - wError("vgId:%d, file:%s, wal head len out of range, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, - pHead->version, pHead->len, offset); - code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); - if (code != TSDB_CODE_SUCCESS) { - walFtruncate(pWal, tfd, offset); - break; - } - } - - ret = (int32_t)tfRead(tfd, pHead->cont, pHead->len); - if (ret < 0) { - wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno)); - code = TAOS_SYSTEM_ERROR(errno); - break; - } - - if (ret < pHead->len) { - wError("vgId:%d, file:%s, failed to read wal body, ret:%d len:%d", pWal->vgId, name, ret, pHead->len); - offset += sizeof(SWalHead); - continue; - } - -#endif offset = offset + sizeof(SWalHead) + pHead->len; wTrace("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d offset:%" PRId64, -- GitLab