提交 e535e660 编写于 作者: A Alex Duan

fix(wal): wal corrupt and repaire

上级 5c5fcc0a
...@@ -19,6 +19,9 @@ ...@@ -19,6 +19,9 @@
extern "C" { extern "C" {
#endif #endif
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE))
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
typedef enum { typedef enum {
TAOS_WAL_NOLOG = 0, TAOS_WAL_NOLOG = 0,
TAOS_WAL_WRITE = 1, TAOS_WAL_WRITE = 1,
...@@ -68,6 +71,9 @@ uint64_t walGetVersion(twalh); ...@@ -68,6 +71,9 @@ uint64_t walGetVersion(twalh);
void walResetVersion(twalh, uint64_t newVer); void walResetVersion(twalh, uint64_t newVer);
int64_t walGetFSize(twalh); int64_t walGetFSize(twalh);
// sync read wal interface
int walValidateChecksum(SWalHead *pHead);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -134,8 +134,56 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { ...@@ -134,8 +134,56 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
return 0; return 0;
} }
static int32_t syncSkipCorruptedRecord(SWalHead *pHead, int32_t fd) {
int64_t pos = taosLSeek(fd, 0, SEEK_CUR);
if (pos < 0) {
wError("fd:%d, skip corrupte taosLSeek return error. pos=%" PRId64, fd, pos);
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
// save start bad bytes positioin
int64_t start = pos;
while (1) {
pos++;
if (taosLSeek(fd, pos, SEEK_SET) < 0) {
sError("fd:%d, failed to seek from corrupted wal file pos=%" PRId64 ".since %s", fd, pos, strerror(errno));
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
if (fdRead(fd, pHead, sizeof(SWalHead)) <= 0) {
sError("fd:%d, failed to read wal head from corrupted wal. pos=%" PRId64 ".since %s", fd, pos, strerror(errno));
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
if (pHead->signature != WAL_SIGNATURE) {
continue;
}
if (pHead->sver == 0) {
// old format wal, only check head crc
if (walValidateChecksum(pHead)) {
sInfo("fd:%d, old wal read head ok, pHead->len=%d, skip bad bytes=%" PRId64 " right pos:%" PRId64, fd, pHead->len, pos - start, pos);
return TSDB_CODE_SUCCESS;
}
} else {
// new format wal, check head + body crc
if (fdRead(fd, pHead->cont, pHead->len) < pHead->len) {
sError("fd:%d, read to end of corrupted wal file, offset:%" PRId64, fd, pos);
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
if (walValidateChecksum(pHead)) {
sInfo("fd:%d, wal read head ok, pHead->len=%d, skip bad bytes=%" PRId64 " right pos:%" PRId64, fd, pHead->len, pos - start, pos);
return TSDB_CODE_SUCCESS;
}
}
}
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
// if only a partial record is read out, upper layer will reload the file to get a complete record // if only a partial record is read out, upper layer will reload the file to get a complete record
static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) { static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t ret = read(sfd, pHead, sizeof(SWalHead)); int32_t ret = read(sfd, pHead, sizeof(SWalHead));
if (ret < 0) { if (ret < 0) {
sError("sfd:%d, failed to read wal head since %s, ret:%d", sfd, strerror(errno), ret); sError("sfd:%d, failed to read wal head since %s, ret:%d", sfd, strerror(errno), ret);
...@@ -153,18 +201,44 @@ static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) { ...@@ -153,18 +201,44 @@ static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) {
return 0; return 0;
} }
assert(pHead->len <= TSDB_MAX_WAL_SIZE); // check wal head valid
if (pHead->sver == 0 && !walValidateChecksum(pHead)) {
sError("sfd:%d, old wal head cksum is messed up, sver=%d version:%" PRIu64 " len:%d", sfd, pHead->sver, pHead->version, pHead->len);
code = syncSkipCorruptedRecord(pHead, sfd);
if (code != TSDB_CODE_SUCCESS) {
sError("sfd:%d, wal corrupted and skip failed crc check, code:%d", sfd, code);
return -1;
}
}
if (pHead->len < 0 || pHead->len > WAL_MAX_SIZE - sizeof(SWalHead)) {
sError("sfd:%d, wal head len out of range, hver:%" PRIu64 " len:%d", sfd, pHead->version, pHead->len);
code = syncSkipCorruptedRecord(pHead, sfd);
if (code != TSDB_CODE_SUCCESS) {
sError("sfd:%d, wal corrupted and skip failed length check, code:%d", sfd, code);
return -1;
}
}
ret = read(sfd, pHead->cont, pHead->len); // read body
ret = (int32_t)read(sfd, pHead->cont, pHead->len);
if (ret < 0) { if (ret < 0) {
sError("sfd:%d, failed to read wal content since %s, ret:%d", sfd, strerror(errno), ret); sError("sfd:%d, wal read wal cont failed. read len=%d, ret:%d", sfd, pHead->len, ret);
return -1; return -1;
} }
if (ret != pHead->len) { if (ret < pHead->len) {
// file is not at end yet, it shall be reloaded sError("sfd:%d, wal read wal cont length small. need read len=%d, ret len:%d", sfd, pHead->len, ret);
sInfo("sfd:%d, a partial wal conetnt is read out, ret:%d", sfd, ret); return -1;
return 0; }
if (pHead->sver != 0 && !walValidateChecksum(pHead)) {
sError("sfd:%d, wal check sum failed, sver=%d version:%" PRIu64 " len:%d", sfd, pHead->sver, pHead->version, pHead->len);
code = syncSkipCorruptedRecord(pHead, sfd);
if (code != TSDB_CODE_SUCCESS) {
sError("sfd:%d, wal read body check sum not right and skip corrupted failed, code:%d", sfd, code);
return -1;
}
} }
return sizeof(SWalHead) + pHead->len; return sizeof(SWalHead) + pHead->len;
......
...@@ -33,6 +33,7 @@ int64_t tfOpenM(const char *pathname, int32_t flags, mode_t mode); ...@@ -33,6 +33,7 @@ int64_t tfOpenM(const char *pathname, int32_t flags, mode_t mode);
int64_t tfClose(int64_t tfd); int64_t tfClose(int64_t tfd);
int64_t tfWrite(int64_t tfd, void *buf, int64_t count); int64_t tfWrite(int64_t tfd, void *buf, int64_t count);
int64_t tfRead(int64_t tfd, void *buf, int64_t count); int64_t tfRead(int64_t tfd, void *buf, int64_t count);
int64_t fdRead(int32_t fd, void *buf, int64_t count);
int32_t tfFsync(int64_t tfd); int32_t tfFsync(int64_t tfd);
bool tfValid(int64_t tfd); bool tfValid(int64_t tfd);
int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence); int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence);
......
...@@ -93,6 +93,13 @@ int64_t tfRead(int64_t tfd, void *buf, int64_t count) { ...@@ -93,6 +93,13 @@ int64_t tfRead(int64_t tfd, void *buf, int64_t count) {
return ret; return ret;
} }
int64_t fdRead(int32_t fd, void *buf, int64_t count) {
int64_t ret = taosRead(fd, buf, count);
if (ret < 0) terrno = TAOS_SYSTEM_ERROR(errno);
return ret;
}
int32_t tfFsync(int64_t tfd) { int32_t tfFsync(int64_t tfd) {
void *p = taosAcquireRef(tsFileRsetId, tfd); void *p = taosAcquireRef(tsFileRsetId, tfd);
if (p == NULL) return -1; if (p == NULL) return -1;
......
...@@ -34,8 +34,6 @@ extern int32_t wDebugFlag; ...@@ -34,8 +34,6 @@ extern int32_t wDebugFlag;
#define WAL_PREFIX "wal" #define WAL_PREFIX "wal"
#define WAL_PREFIX_LEN 3 #define WAL_PREFIX_LEN 3
#define WAL_REFRESH_MS 1000 #define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) #define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32) #define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_FILE_NUM 1 // 3 #define WAL_FILE_NUM 1 // 3
......
...@@ -112,7 +112,7 @@ void walRemoveAllOldFiles(void *handle) { ...@@ -112,7 +112,7 @@ void walRemoveAllOldFiles(void *handle) {
pthread_mutex_unlock(&pWal->mutex); pthread_mutex_unlock(&pWal->mutex);
} }
#if defined(WAL_CHECKSUM_WHOLE)
static void walUpdateChecksum(SWalHead *pHead) { static void walUpdateChecksum(SWalHead *pHead) {
pHead->sver = 2; pHead->sver = 2;
...@@ -123,17 +123,16 @@ static void walUpdateChecksum(SWalHead *pHead) { ...@@ -123,17 +123,16 @@ static void walUpdateChecksum(SWalHead *pHead) {
static int walValidateChecksum(SWalHead *pHead) { static int walValidateChecksum(SWalHead *pHead) {
if (pHead->sver == 0) { // for compatible with wal before sver 1 if (pHead->sver == 0) { // for compatible with wal before sver 1
return taosCheckChecksumWhole((uint8_t *)pHead, sizeof(*pHead)); return taosCheckChecksumWhole((uint8_t *)pHead, sizeof(*pHead));
} else if (pHead->sver >= 1) { } else {
// new wal format
uint32_t cksum = pHead->cksum; uint32_t cksum = pHead->cksum;
pHead->cksum = 0; pHead->cksum = 0;
return taosCheckChecksum((uint8_t *)pHead, sizeof(*pHead) + pHead->len, cksum); int ret = taosCheckChecksum((uint8_t *)pHead, sizeof(*pHead) + pHead->len, cksum);
pHead->cksum = cksum; // must restore cksum for next call walValiteCheckSum
return ret;
} }
return 0;
} }
#endif
int32_t walWrite(void *handle, SWalHead *pHead) { int32_t walWrite(void *handle, SWalHead *pHead) {
if (handle == NULL) return -1; if (handle == NULL) return -1;
...@@ -275,38 +274,77 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, ...@@ -275,38 +274,77 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd,
continue; continue;
} }
#if defined(WAL_CHECKSUM_WHOLE) if (pHead->sver == 0) {
if (pHead->sver == 0 && walValidateChecksum(pHead)) { // old wal file format, only check head data crc
if (walValidateChecksum(pHead)) {
wInfo("vgId:%d, wal head cksum check passed, offset:%" PRId64, pWal->vgId, pos); wInfo("vgId:%d, wal head cksum check passed, offset:%" PRId64, pWal->vgId, pos);
*offset = pos; *offset = pos;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} else {
if (pHead->sver >= 1) { // maybe new wal file format, read body data and check head + body crc
if (tfRead(tfd, pHead->cont, pHead->len) < pHead->len) { if (tfRead(tfd, pHead->cont, pHead->len) < pHead->len) {
wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos); wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
return TSDB_CODE_WAL_FILE_CORRUPTED; return TSDB_CODE_WAL_FILE_CORRUPTED;
} }
// check head + body crc
if (walValidateChecksum(pHead)) { if (walValidateChecksum(pHead)) {
wInfo("vgId:%d, wal whole cksum check passed, offset:%" PRId64, pWal->vgId, pos); wInfo("vgId:%d, wal whole cksum check passed, offset:%" PRId64, pWal->vgId, pos);
*offset = pos; *offset = pos;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
}
#else return TSDB_CODE_WAL_FILE_CORRUPTED;
if (taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { }
static int32_t walSkipOldCorrupted(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) {
int64_t pos = *offset;
while (1) {
pos++;
if (tfLseek(tfd, pos, SEEK_SET) < 0) {
wError("vgId:%d, failed to seek from corrupted wal file since %s", pWal->vgId, strerror(errno));
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
if (tfRead(tfd, pHead, sizeof(SWalHead)) <= 0) {
wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
if (pHead->signature != WAL_SIGNATURE) {
continue;
}
if (pHead->sver == 0 && walValidateChecksum(pHead)) {
wInfo("vgId:%d, wal head cksum check passed, offset:%" PRId64, pWal->vgId, pos); wInfo("vgId:%d, wal head cksum check passed, offset:%" PRId64, pWal->vgId, pos);
*offset = pos; *offset = pos;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#endif if (pHead->sver >= 1) {
if (tfRead(tfd, pHead->cont, pHead->len) < pHead->len) {
wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
if (walValidateChecksum(pHead)) {
wInfo("vgId:%d, wal whole cksum check passed, offset:%" PRId64, pWal->vgId, pos);
*offset = pos;
return TSDB_CODE_SUCCESS;
}
}
} }
return TSDB_CODE_WAL_FILE_CORRUPTED; return TSDB_CODE_WAL_FILE_CORRUPTED;
} }
// Add SMemRowType ahead of SDataRow // Add SMemRowType ahead of SDataRow
static void expandSubmitBlk(SSubmitBlk *pDest, SSubmitBlk *pSrc, int32_t *lenExpand) { static void expandSubmitBlk(SSubmitBlk *pDest, SSubmitBlk *pSrc, int32_t *lenExpand) {
// copy the header firstly // copy the header firstly
...@@ -454,9 +492,9 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -454,9 +492,9 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
break; break;
} }
#if defined(WAL_CHECKSUM_WHOLE) // sver == 0 is old wal format, other is new wal
if ((pHead->sver == 0 && !walValidateChecksum(pHead)) || pHead->sver < 0 || pHead->sver > 2) { if (pHead->sver == 0 && !walValidateChecksum(pHead)) {
wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, wError("vgId:%d, file:%s, old wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset); pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -465,7 +503,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -465,7 +503,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
} }
} }
if (pHead->len < 0 || pHead->len > size - sizeof(SWalHead)) { if ( pHead->sver == 0 && (pHead->len < 0 || pHead->len > size - sizeof(SWalHead))) {
wError("vgId:%d, file:%s, wal head len out of range, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, wError("vgId:%d, file:%s, wal head len out of range, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset); pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
...@@ -488,7 +526,9 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -488,7 +526,9 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
continue; continue;
} }
if ((pHead->sver >= 1) && !walValidateChecksum(pHead)) { // check new wal sum head + body crc
if ((pHead->sver != 0) && !walValidateChecksum(pHead)) {
// new format wal corrupted
wError("vgId:%d, file:%s, wal whole cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, wError("vgId:%d, file:%s, wal whole cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset); pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
...@@ -498,41 +538,6 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -498,41 +538,6 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
} }
} }
#else
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
if (code != TSDB_CODE_SUCCESS) {
walFtruncate(pWal, tfd, offset);
break;
}
}
if (pHead->len < 0 || pHead->len > size - sizeof(SWalHead)) {
wError("vgId:%d, file:%s, wal head len out of range, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
if (code != TSDB_CODE_SUCCESS) {
walFtruncate(pWal, tfd, offset);
break;
}
}
ret = (int32_t)tfRead(tfd, pHead->cont, pHead->len);
if (ret < 0) {
wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno));
code = TAOS_SYSTEM_ERROR(errno);
break;
}
if (ret < pHead->len) {
wError("vgId:%d, file:%s, failed to read wal body, ret:%d len:%d", pWal->vgId, name, ret, pHead->len);
offset += sizeof(SWalHead);
continue;
}
#endif
offset = offset + sizeof(SWalHead) + pHead->len; offset = offset + sizeof(SWalHead) + pHead->len;
wTrace("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d offset:%" PRId64, wTrace("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d offset:%" PRId64,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册