提交 987e92cd 编写于 作者: A Alex Duan

fix(wal): deal with read context after found next valid item

上级 516caf08
...@@ -224,6 +224,8 @@ static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) { ...@@ -224,6 +224,8 @@ static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) {
sError("sfd:%d, wal corrupted and skip failed crc check, code:%d", sfd, code); sError("sfd:%d, wal corrupted and skip failed crc check, code:%d", sfd, code);
return -1; return -1;
} }
// found next valid item
if (pHead->sver != 0) return sizeof(SWalHead) + pHead->len;
} }
if (pHead->len < 0 || pHead->len > WAL_MAX_SIZE - sizeof(SWalHead)) { if (pHead->len < 0 || pHead->len > WAL_MAX_SIZE - sizeof(SWalHead)) {
...@@ -233,6 +235,8 @@ static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) { ...@@ -233,6 +235,8 @@ static int32_t syncReadOneWalRecord(int32_t sfd, SWalHead *pHead) {
sError("sfd:%d, wal corrupted and skip failed length check, code:%d", sfd, code); sError("sfd:%d, wal corrupted and skip failed length check, code:%d", sfd, code);
return -1; return -1;
} }
// found next valid item
if (pHead->sver != 0) return sizeof(SWalHead) + pHead->len;
} }
// read body // read body
......
...@@ -445,6 +445,8 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -445,6 +445,8 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
break; break;
} }
bool contAlreadyRead = false;
// sver == 0 is old wal format, other is new wal // sver == 0 is old wal format, other is new wal
if (pHead->sver == 0 && !walValidateChecksum(pHead)) { if (pHead->sver == 0 && !walValidateChecksum(pHead)) {
wError("vgId:%d, file:%s, old wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, wError("vgId:%d, file:%s, old wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
...@@ -454,6 +456,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -454,6 +456,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
walFtruncate(pWal, tfd, offset); walFtruncate(pWal, tfd, offset);
break; break;
} }
if (pHead->sver != 0) contAlreadyRead = true;
} }
if ( pHead->sver == 0 && (pHead->len < 0 || pHead->len > size - sizeof(SWalHead))) { if ( pHead->sver == 0 && (pHead->len < 0 || pHead->len > size - sizeof(SWalHead))) {
...@@ -464,30 +467,33 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -464,30 +467,33 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
walFtruncate(pWal, tfd, offset); walFtruncate(pWal, tfd, offset);
break; break;
} }
if (pHead->sver != 0) contAlreadyRead = true;
} }
ret = (int32_t)tfRead(tfd, pHead->cont, pHead->len); if (!contAlreadyRead) {
if (ret < 0) { ret = (int32_t)tfRead(tfd, pHead->cont, pHead->len);
wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno)); if (ret < 0) {
code = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno));
break; code = TAOS_SYSTEM_ERROR(errno);
} break;
}
if (ret < pHead->len) { if (ret < pHead->len) {
wError("vgId:%d, file:%s, failed to read wal body, ret:%d len:%d", pWal->vgId, name, ret, pHead->len); wError("vgId:%d, file:%s, failed to read wal body, ret:%d len:%d", pWal->vgId, name, ret, pHead->len);
offset += sizeof(SWalHead); offset += sizeof(SWalHead);
continue; continue;
} }
// check new wal sum head + body crc // check new wal sum head + body crc
if ((pHead->sver != 0) && !walValidateChecksum(pHead)) { if ((pHead->sver != 0) && !walValidateChecksum(pHead)) {
// new format wal corrupted // new format wal corrupted
wError("vgId:%d, file:%s, wal whole cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, wError("vgId:%d, file:%s, wal whole cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId,
pHead->version, pHead->len, offset); name, pHead->version, pHead->len, offset);
code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
walFtruncate(pWal, tfd, offset); walFtruncate(pWal, tfd, offset);
break; break;
}
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册