提交 03fbe89e 编写于 作者: S Shengliang Guan

TD-1891

上级 cf15f653
...@@ -189,6 +189,40 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) { ...@@ -189,6 +189,40 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
return code; return code;
} }
static void walFtruncate(SWal *pWal, int32_t fd, int64_t offset) {
taosFtruncate(fd, offset);
fsync(fd);
}
static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int32_t fd, int64_t *offset) {
int64_t pos = *offset;
while (1) {
pos++;
if (lseek(fd, pos, SEEK_SET) < 0) {
wError("vgId:%d, failed to seek from corrupted wal file since %s", pWal->vgId, strerror(errno));
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
if (taosTRead(fd, pHead, sizeof(SWalHead)) <= 0) {
wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
if (pHead->signature != WAL_SIGNATURE) {
continue;
}
if (taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wInfo("vgId:%d, wal head cksum check passed, offset:%" PRId64, pWal->vgId, pos);
*offset = pos;
return TSDB_CODE_SUCCESS;
}
}
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name) { static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name) {
int32_t size = WAL_MAX_SIZE; int32_t size = WAL_MAX_SIZE;
void * buffer = tmalloc(size); void * buffer = tmalloc(size);
...@@ -222,17 +256,19 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -222,17 +256,19 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
if (ret < sizeof(SWalHead)) { if (ret < sizeof(SWalHead)) {
wError("vgId:%d, file:%s, failed to read wal head, ret is %d", pWal->vgId, name, ret); wError("vgId:%d, file:%s, failed to read wal head, ret is %d", pWal->vgId, name, ret);
taosFtruncate(fd, offset); walFtruncate(pWal, fd, offset);
fsync(fd);
break; break;
} }
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wError("vgId:%d, file:%s, wal head cksum is messed up, offset:%" PRId64, pWal->vgId, name, offset); wError("vgId:%d, file:%s, wal head cksum is messed up, ver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
code = TSDB_CODE_WAL_FILE_CORRUPTED; pHead->version, pHead->len, offset);
ASSERT(false); code = walSkipCorruptedRecord(pWal, pHead, fd, &offset);
if (code != TSDB_CODE_SUCCESS) {
walFtruncate(pWal, fd, offset);
break; break;
} }
}
if (pHead->len > size - sizeof(SWalHead)) { if (pHead->len > size - sizeof(SWalHead)) {
size = sizeof(SWalHead) + pHead->len; size = sizeof(SWalHead) + pHead->len;
...@@ -255,9 +291,8 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -255,9 +291,8 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
if (ret < pHead->len) { if (ret < pHead->len) {
wError("vgId:%d, file:%s, failed to read wal body, ret:%d len:%d", pWal->vgId, name, ret, pHead->len); wError("vgId:%d, file:%s, failed to read wal body, ret:%d len:%d", pWal->vgId, name, ret, pHead->len);
taosFtruncate(fd, offset); offset += sizeof(SWalHead);
fsync(fd); continue;
break;
} }
offset = offset + sizeof(SWalHead) + pHead->len; offset = offset + sizeof(SWalHead) + pHead->len;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册