diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 67e2d43c98c11a24e348b5102377367024fa0286..17b2d24e909ff1a62f7c14b7ec658626b6deb159 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -246,6 +246,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_ENABLED, 0, 0x0901, "Sync modul // wal TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, 0, 0x1000, "Unexpected generic error in wal") +TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, 0, 0x1001, "WAL file is corrupted") // http TAOS_DEFINE_ERROR(TSDB_CODE_HTTP_SERVER_OFFLINE, 0, 0x1100, "http server is not onlin") diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 646c17b2b8b64344afb7c5117f2bd83b3da89dab..4c672eb557427d76f0989a4d8975eb7f7856c39e 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -185,7 +185,11 @@ static int32_t sdbInitWal() { } sdbInfo("open sdb wal for restore"); - walRestore(tsSdbObj.wal, NULL, sdbWrite); + int code = walRestore(tsSdbObj.wal, NULL, sdbWrite); + if (code != TSDB_CODE_SUCCESS) { + sdbError("failed to open wal for restore, reason:%s", tstrerror(code)); + return -1; + } return 0; } diff --git a/src/wal/src/walMain.c b/src/wal/src/walMain.c index bebad69f3224e70efb795dad51e77745ea3053e4..4987ba211667f60bc7a0ece60fc569768fd89256 100644 --- a/src/wal/src/walMain.c +++ b/src/wal/src/walMain.c @@ -347,9 +347,10 @@ static void walRelease(SWal *pWal) { static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { char *name = pWal->name; + int size = 1024 * 1024; // default 1M buffer size terrno = 0; - char *buffer = malloc(1024000); // size for one record + char *buffer = malloc(size); if (buffer == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); return terrno; @@ -357,7 +358,7 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { SWalHead *pHead = (SWalHead *)buffer; - int fd = open(name, O_RDONLY); + int fd = open(name, O_RDWR); if (fd < 0) { wError("wal:%s, failed to open for restore(%s)", name, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); @@ -367,29 +368,58 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) { wDebug("wal:%s, start to restore", name); + size_t offset = 0; while (1) { int ret = taosTRead(fd, pHead, sizeof(SWalHead)); - if ( ret == 0) break; + if (ret == 0) break; - if (ret != sizeof(SWalHead)) { - wWarn("wal:%s, failed to read head, skip, ret:%d(%s)", name, ret, strerror(errno)); + if (ret < 0) { + wError("wal:%s, failed to read wal head part since %s", name, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); break; } + if (ret < sizeof(SWalHead)) { + wError("wal:%s, failed to read head, ret:%d, skip the rest of file", name, ret); + taosFtruncate(fd, offset); + fsync(fd); + break; + } + if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { wWarn("wal:%s, cksum is messed up, skip the rest of file", name); - terrno = TAOS_SYSTEM_ERROR(errno); + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; + ASSERT(false); break; - } + } + + if (pHead->len > size - sizeof(SWalHead)) { + size = sizeof(SWalHead) + pHead->len; + buffer = realloc(buffer, size); + if (buffer == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + break; + } + + pHead = (SWalHead *)buffer; + } ret = taosTRead(fd, pHead->cont, pHead->len); - if ( ret != pHead->len) { - wWarn("wal:%s, failed to read body, skip, len:%d ret:%d", name, pHead->len, ret); + if (ret < 0) { + wError("wal:%s failed to read wal body part since %s", name, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); break; } + if (ret < pHead->len) { + wError("wal:%s, failed to read body, len:%d ret:%d, skip the rest of file", name, pHead->len, ret); + taosFtruncate(fd, offset); + fsync(fd); + break; + } + + offset = offset + sizeof(SWalHead) + pHead->len; + if (pWal->keep) pWal->version = pHead->version; (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL); }