未验证 提交 e92e7d98 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4079 from taosdata/feature/wal

TD-1891
...@@ -189,6 +189,40 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) { ...@@ -189,6 +189,40 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
return code; return code;
} }
static void walFtruncate(SWal *pWal, int32_t fd, int64_t offset) {
taosFtruncate(fd, offset);
fsync(fd);
}
static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int32_t fd, int64_t *offset) {
int64_t pos = *offset;
while (1) {
pos++;
if (lseek(fd, pos, SEEK_SET) < 0) {
wError("vgId:%d, failed to seek from corrupted wal file since %s", pWal->vgId, strerror(errno));
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
if (taosTRead(fd, pHead, sizeof(SWalHead)) <= 0) {
wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos);
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
if (pHead->signature != WAL_SIGNATURE) {
continue;
}
if (taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wInfo("vgId:%d, wal head cksum check passed, offset:%" PRId64, pWal->vgId, pos);
*offset = pos;
return TSDB_CODE_SUCCESS;
}
}
return TSDB_CODE_WAL_FILE_CORRUPTED;
}
static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name) { static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, char *name) {
int32_t size = WAL_MAX_SIZE; int32_t size = WAL_MAX_SIZE;
void * buffer = tmalloc(size); void * buffer = tmalloc(size);
...@@ -222,16 +256,18 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -222,16 +256,18 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
if (ret < sizeof(SWalHead)) { if (ret < sizeof(SWalHead)) {
wError("vgId:%d, file:%s, failed to read wal head, ret is %d", pWal->vgId, name, ret); wError("vgId:%d, file:%s, failed to read wal head, ret is %d", pWal->vgId, name, ret);
taosFtruncate(fd, offset); walFtruncate(pWal, fd, offset);
fsync(fd);
break; break;
} }
if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) {
wError("vgId:%d, file:%s, wal head cksum is messed up, offset:%" PRId64, pWal->vgId, name, offset); wError("vgId:%d, file:%s, wal head cksum is messed up, ver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name,
code = TSDB_CODE_WAL_FILE_CORRUPTED; pHead->version, pHead->len, offset);
ASSERT(false); code = walSkipCorruptedRecord(pWal, pHead, fd, &offset);
break; if (code != TSDB_CODE_SUCCESS) {
walFtruncate(pWal, fd, offset);
break;
}
} }
if (pHead->len > size - sizeof(SWalHead)) { if (pHead->len > size - sizeof(SWalHead)) {
...@@ -255,9 +291,8 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch ...@@ -255,9 +291,8 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
if (ret < pHead->len) { if (ret < pHead->len) {
wError("vgId:%d, file:%s, failed to read wal body, ret:%d len:%d", pWal->vgId, name, ret, pHead->len); wError("vgId:%d, file:%s, failed to read wal body, ret:%d len:%d", pWal->vgId, name, ret, pHead->len);
taosFtruncate(fd, offset); offset += sizeof(SWalHead);
fsync(fd); continue;
break;
} }
offset = offset + sizeof(SWalHead) + pHead->len; offset = offset + sizeof(SWalHead) + pHead->len;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册