未验证 提交 132d0ac1 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #17904 from taosdata/fix/TS-1992-2.6

fix(wal): fixed three problems about  corrupted wal would be repaired
......@@ -19,6 +19,9 @@
extern "C" {
#endif
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE))
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
typedef enum {
TAOS_WAL_NOLOG = 0,
TAOS_WAL_WRITE = 1,
......
......@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tchecksum.h"
#include "tlog.h"
#include "tutil.h"
#include "tglobal.h"
......@@ -23,6 +24,7 @@
#include "tsocket.h"
#include "twal.h"
#include "tsync.h"
#include "tfile.h"
#include "syncInt.h"
static int32_t syncGetWalVersion(SSyncNode *pNode, SSyncPeer *pPeer) {
......@@ -134,8 +136,69 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
return 0;
}
static int syncValidateChecksum(SWalHead *pHead) {
if (pHead->sver == 0) { // for compatible with wal before sver 1
return taosCheckChecksumWhole((uint8_t *)pHead, sizeof(*pHead));
} else {
// new wal format
uint32_t cksum = pHead->cksum;
pHead->cksum = 0;
int ret = taosCheckChecksum((uint8_t *)pHead, sizeof(*pHead) + pHead->len, cksum);
pHead->cksum = cksum; // must restore cksum for next call walValiteCheckSum
return ret;
}
}
static int32_t syncSkipCorruptedRecord(SWalHead *pHead, int32_t fd) {
int64_t pos = taosLSeek(fd, 0, SEEK_CUR);
if (pos < 0) {
sError("fd:%d, skip corrupte taosLSeek return error. pos=%" PRId64, fd, pos);
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
// save start bad bytes positioin
int64_t start = pos;
while (1) {
pos++;
if (taosLSeek(fd, pos, SEEK_SET) < 0) {
sError("fd:%d, failed to seek from corrupted wal file pos=%" PRId64 ".since %s", fd, pos, strerror(errno));
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
if (fdRead(fd, pHead, sizeof(SWalHead)) <= 0) {
sError("fd:%d, failed to read wal head from corrupted wal. pos=%" PRId64 ".since %s", fd, pos, strerror(errno));
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
if (pHead->signature != WAL_SIGNATURE) {
continue;
}
if (pHead->sver == 0) {
// old format wal, only check head crc
if (syncValidateChecksum(pHead)) {
sInfo("fd:%d, old wal read head ok, pHead->len=%d, skip bad bytes=%" PRId64 " right pos:%" PRId64, fd, pHead->len, pos - start, pos);
return TSDB_CODE_SUCCESS;
}
} else {
// new format wal, check head + body crc
if (fdRead(fd, pHead->cont, pHead->len) < pHead->len) {
sError("fd:%d, read to end of corrupted wal file, offset:%" PRId64, fd, pos);
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
if (syncValidateChecksum(pHead)) {
sInfo("fd:%d, wal read head ok, pHead->len=%d, skip bad bytes=%" PRId64 " right pos:%" PRId64, fd, pHead->len, pos - start, pos);
return TSDB_CODE_SUCCESS;
}
}
}
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
// if only a partial record is read out, upper layer will reload the file to get a complete record
static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t ret = read(sfd, pHead, sizeof(SWalHead));
if (ret < 0) {
sError("sfd:%d, failed to read wal head since %s, ret:%d", sfd, strerror(errno), ret);
......@@ -153,18 +216,48 @@ static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) {
return 0;
}
assert(pHead->len <= TSDB_MAX_WAL_SIZE);
// check wal head valid
if (pHead->sver == 0 && !syncValidateChecksum(pHead)) {
sError("sfd:%d, old wal head cksum is messed up, sver=%d version:%" PRIu64 " len:%d", sfd, pHead->sver, pHead->version, pHead->len);
code = syncSkipCorruptedRecord(pHead, sfd);
if (code != TSDB_CODE_SUCCESS) {
sError("sfd:%d, wal corrupted and skip failed crc check, code:%d", sfd, code);
return -1;
}
// found next valid item
if (pHead->sver != 0) return sizeof(SWalHead) + pHead->len;
}
if (pHead->len < 0 || pHead->len > WAL_MAX_SIZE - sizeof(SWalHead)) {
sError("sfd:%d, wal head len out of range, hver:%" PRIu64 " len:%d", sfd, pHead->version, pHead->len);
code = syncSkipCorruptedRecord(pHead, sfd);
if (code != TSDB_CODE_SUCCESS) {
sError("sfd:%d, wal corrupted and skip failed length check, code:%d", sfd, code);
return -1;
}
// found next valid item
if (pHead->sver != 0) return sizeof(SWalHead) + pHead->len;
}
ret = read(sfd, pHead->cont, pHead->len);
// read body
ret = (int32_t)read(sfd, pHead->cont, pHead->len);
if (ret < 0) {
sError("sfd:%d, failed to read wal content since %s, ret:%d", sfd, strerror(errno), ret);
sError("sfd:%d, wal read wal cont failed. read len=%d, ret:%d", sfd, pHead->len, ret);
return -1;
}
if (ret != pHead->len) {
// file is not at end yet, it shall be reloaded
sInfo("sfd:%d, a partial wal conetnt is read out, ret:%d", sfd, ret);
return 0;
if (ret < pHead->len) {
sError("sfd:%d, wal read wal cont length small. need read len=%d, ret len:%d", sfd, pHead->len, ret);
return -1;
}
if (pHead->sver != 0 && !syncValidateChecksum(pHead)) {
sError("sfd:%d, wal check sum failed, sver=%d version:%" PRIu64 " len:%d", sfd, pHead->sver, pHead->version, pHead->len);
code = syncSkipCorruptedRecord(pHead, sfd);
if (code != TSDB_CODE_SUCCESS) {
sError("sfd:%d, wal read body check sum not right and skip corrupted failed, code:%d", sfd, code);
return -1;
}
}
return sizeof(SWalHead) + pHead->len;
......
......@@ -33,6 +33,7 @@ int64_t tfOpenM(const char *pathname, int32_t flags, mode_t mode);
int64_t tfClose(int64_t tfd);
int64_t tfWrite(int64_t tfd, void *buf, int64_t count);
int64_t tfRead(int64_t tfd, void *buf, int64_t count);
int64_t fdRead(int32_t fd, void *buf, int64_t count);
int32_t tfFsync(int64_t tfd);
bool tfValid(int64_t tfd);
int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence);
......
......@@ -93,6 +93,13 @@ int64_t tfRead(int64_t tfd, void *buf, int64_t count) {
return ret;
}
int64_t fdRead(int32_t fd, void *buf, int64_t count) {
int64_t ret = taosRead(fd, buf, count);
if (ret < 0) terrno = TAOS_SYSTEM_ERROR(errno);
return ret;
}
int32_t tfFsync(int64_t tfd) {
void *p = taosAcquireRef(tsFileRsetId, tfd);
if (p == NULL) return -1;
......
......@@ -34,8 +34,6 @@ extern int32_t wDebugFlag;
#define WAL_PREFIX "wal"
#define WAL_PREFIX_LEN 3
#define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + 16)
#define WAL_SIGNATURE ((uint32_t)(0xFAFBFDFE))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_FILE_NUM 1 // 3
......
......@@ -112,8 +112,6 @@ void walRemoveAllOldFiles(void *handle) {
pthread_mutex_unlock(&pWal->mutex);
}
#if defined(WAL_CHECKSUM_WHOLE)
static void walUpdateChecksum(SWalHead *pHead) {
pHead->sver = 2;
pHead->cksum = 0;
......@@ -123,17 +121,16 @@ static void walUpdateChecksum(SWalHead *pHead) {
static int walValidateChecksum(SWalHead *pHead) {
if (pHead->sver == 0) { // for compatible with wal before sver 1
return taosCheckChecksumWhole((uint8_t *)pHead, sizeof(*pHead));
} else if (pHead->sver >= 1) {
} else {
// new wal format
uint32_t cksum = pHead->cksum;
pHead->cksum = 0;
return taosCheckChecksum((uint8_t *)pHead, sizeof(*pHead) + pHead->len, cksum);
int ret = taosCheckChecksum((uint8_t *)pHead, sizeof(*pHead) + pHead->len, cksum);
pHead->cksum = cksum; // must restore cksum for next call walValiteCheckSum
return ret;
}
return 0;
}
#endif
int32_t walWrite(void *handle, SWalHead *pHead) {
if (handle == NULL) return -1;
......@@ -146,13 +143,8 @@ int32_t walWrite(void *handle, SWalHead *pHead) {
if (pHead->version <= pWal->version) return 0;
pHead->signature = WAL_SIGNATURE;
#if defined(WAL_CHECKSUM_WHOLE)
walUpdateChecksum(pHead);
#else
pHead->sver = 0;
taosCalcChecksumAppend(0, (uint8_t *)pHead, sizeof(SWalHead));
#endif
walUpdateChecksum(pHead);
int32_t contLen = pHead->len + sizeof(SWalHead);
pthread_mutex_lock(&pWal->mutex);
......@@ -275,38 +267,32 @@ static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd,
continue;
}
#if defined(WAL_CHECKSUM_WHOLE)
if (pHead->sver == 0 && walValidateChecksum(pHead)) {
wInfo("vgId:%d, wal head cksum check passed, offset:%" PRId64, pWal->vgId, pos);
*offset = pos;
return TSDB_CODE_SUCCESS;
}
if (pHead->sver >= 1) {
if (pHead->sver == 0) {
// old wal file format, only check head data crc
if (walValidateChecksum(pHead)) {
wInfo("vgId:%d, wal head cksum check passed, offset:%" PRId64, pWal->vgId, pos);
*offset = pos;
return TSDB_CODE_SUCCESS;
}
} else {
// maybe new wal file format, read body data and check head + body crc
if (tfRead(tfd, pHead->cont, pHead->len) < pHead->len) {
wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
return TSDB_CODE_WAL_FILE_CORRUPTED;
wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
// check head + body crc
if (walValidateChecksum(pHead)) {
wInfo("vgId:%d, wal whole cksum check passed, offset:%" PRId64, pWal->vgId, pos);
*offset = pos;
return TSDB_CODE_SUCCESS;
wInfo("vgId:%d, wal whole cksum check passed, offset:%" PRId64, pWal->vgId, pos);
*offset = pos;
return TSDB_CODE_SUCCESS;
}
}
#else
if (taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wInfo("vgId:%d, wal head cksum check passed, offset:%" PRId64, pWal->vgId, pos);
*offset = pos;
return TSDB_CODE_SUCCESS;
}
#endif
}
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
// Add SMemRowType ahead of SDataRow
static void expandSubmitBlk(SSubmitBlk *pDest, SSubmitBlk *pSrc, int32_t *lenExpand) {
// copy the header firstly
......@@ -454,18 +440,21 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
break;
}
#if defined(WAL_CHECKSUM_WHOLE)
if ((pHead->sver == 0 && !walValidateChecksum(pHead)) || pHead->sver < 0 || pHead->sver > 2) {
wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
bool contAlreadyRead = false;
// sver == 0 is old wal format, other is new wal
if (pHead->sver == 0 && !walValidateChecksum(pHead)) {
wError("vgId:%d, file:%s, old wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
if (code != TSDB_CODE_SUCCESS) {
walFtruncate(pWal, tfd, offset);
break;
}
if (pHead->sver != 0) contAlreadyRead = true;
}
if (pHead->len < 0 || pHead->len > size - sizeof(SWalHead)) {
if ( pHead->sver == 0 && (pHead->len < 0 || pHead->len > size - sizeof(SWalHead))) {
wError("vgId:%d, file:%s, wal head len out of range, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
......@@ -473,66 +462,36 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
walFtruncate(pWal, tfd, offset);
break;
}
if (pHead->sver != 0) contAlreadyRead = true;
}
ret = (int32_t)tfRead(tfd, pHead->cont, pHead->len);
if (ret < 0) {
wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno));
code = TAOS_SYSTEM_ERROR(errno);
break;
}
if (ret < pHead->len) {
wError("vgId:%d, file:%s, failed to read wal body, ret:%d len:%d", pWal->vgId, name, ret, pHead->len);
offset += sizeof(SWalHead);
continue;
}
if ((pHead->sver >= 1) && !walValidateChecksum(pHead)) {
wError("vgId:%d, file:%s, wal whole cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
if (code != TSDB_CODE_SUCCESS) {
walFtruncate(pWal, tfd, offset);
if (!contAlreadyRead) {
ret = (int32_t)tfRead(tfd, pHead->cont, pHead->len);
if (ret < 0) {
wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno));
code = TAOS_SYSTEM_ERROR(errno);
break;
}
}
#else
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
if (code != TSDB_CODE_SUCCESS) {
walFtruncate(pWal, tfd, offset);
break;
if (ret < pHead->len) {
wError("vgId:%d, file:%s, failed to read wal body, ret:%d len:%d", pWal->vgId, name, ret, pHead->len);
offset += sizeof(SWalHead);
continue;
}
}
if (pHead->len < 0 || pHead->len > size - sizeof(SWalHead)) {
wError("vgId:%d, file:%s, wal head len out of range, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
if (code != TSDB_CODE_SUCCESS) {
walFtruncate(pWal, tfd, offset);
break;
// check new wal sum head + body crc
if ((pHead->sver != 0) && !walValidateChecksum(pHead)) {
// new format wal corrupted
wError("vgId:%d, file:%s, wal whole cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId,
name, pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
if (code != TSDB_CODE_SUCCESS) {
walFtruncate(pWal, tfd, offset);
break;
}
}
}
ret = (int32_t)tfRead(tfd, pHead->cont, pHead->len);
if (ret < 0) {
wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno));
code = TAOS_SYSTEM_ERROR(errno);
break;
}
if (ret < pHead->len) {
wError("vgId:%d, file:%s, failed to read wal body, ret:%d len:%d", pWal->vgId, name, ret, pHead->len);
offset += sizeof(SWalHead);
continue;
}
#endif
offset = offset + sizeof(SWalHead) + pHead->len;
wTrace("vgId:%d, restore wal, fileId:%" PRId64 " hver:%" PRIu64 " wver:%" PRIu64 " len:%d offset:%" PRId64,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册