diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 5b8d70fb7c127d476566f54623c43783cd5a240a..14173690967ffd26be92cf13a05af6f7508533fe 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -77,11 +77,11 @@ typedef struct { } SWalSyncInfo; typedef struct { - int8_t protoVer; int64_t version; - int16_t msgType; + int64_t ingestTs; int32_t bodyLen; - int64_t ingestTs; // not implemented + int16_t msgType; + int8_t protoVer; // sync meta SWalSyncInfo syncMeta; diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index a5fd3fca35c01b84f0b2c5b7cc4fe3b7bbf76662..a8da6809100fa5789e5b7e57e051631257782e1e 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -139,7 +139,7 @@ int walCheckAndRepairMeta(SWal* pWal) { const char* idxPattern = "^[0-9]+.idx$"; regex_t logRegPattern; regex_t idxRegPattern; - SArray* pLogInfoArray = taosArrayInit(8, sizeof(SWalFileInfo)); + SArray* actualLog = taosArrayInit(8, sizeof(SWalFileInfo)); regcomp(&logRegPattern, logPattern, REG_EXTENDED); regcomp(&idxRegPattern, idxPattern, REG_EXTENDED); @@ -159,7 +159,7 @@ int walCheckAndRepairMeta(SWal* pWal) { SWalFileInfo fileInfo; memset(&fileInfo, -1, sizeof(SWalFileInfo)); sscanf(name, "%" PRId64 ".log", &fileInfo.firstVer); - taosArrayPush(pLogInfoArray, &fileInfo); + taosArrayPush(actualLog, &fileInfo); } } @@ -167,10 +167,10 @@ int walCheckAndRepairMeta(SWal* pWal) { regfree(&logRegPattern); regfree(&idxRegPattern); - taosArraySort(pLogInfoArray, compareWalFileInfo); + taosArraySort(actualLog, compareWalFileInfo); int metaFileNum = taosArrayGetSize(pWal->fileInfoSet); - int actualFileNum = taosArrayGetSize(pLogInfoArray); + int actualFileNum = taosArrayGetSize(actualLog); #if 0 for (int32_t fileNo = actualFileNum - 1; fileNo >= 0; fileNo--) { @@ -196,11 +196,11 @@ int walCheckAndRepairMeta(SWal* pWal) { taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum); } else if (metaFileNum < actualFileNum) { for (int i = metaFileNum; i < actualFileNum; i++) { - SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, i); + SWalFileInfo* pFileInfo = taosArrayGet(actualLog, i); taosArrayPush(pWal->fileInfoSet, pFileInfo); } } - taosArrayDestroy(pLogInfoArray); + taosArrayDestroy(actualLog); pWal->writeCur = actualFileNum - 1; if (actualFileNum > 0) { @@ -221,7 +221,7 @@ int walCheckAndRepairMeta(SWal* pWal) { int code = walSaveMeta(pWal); if (code < 0) { - taosArrayDestroy(pLogInfoArray); + taosArrayDestroy(actualLog); return -1; } } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index ac62b7d98dfeec5e7df073ef165a79d1ad95b7f6..787c9af31703df50a4b91589e20f5f9373d71c56 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -423,37 +423,38 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { return 0; } -int32_t walReadVer(SWalReader *pRead, int64_t ver) { - wDebug("vgId:%d wal start to read ver %ld", pRead->pWal->cfg.vgId, ver); +int32_t walReadVer(SWalReader *pReader, int64_t ver) { + wDebug("vgId:%d wal start to read ver %ld", pReader->pWal->cfg.vgId, ver); int64_t contLen; + int32_t code; bool seeked = false; - if (pRead->pWal->vers.firstVer == -1) { + if (pReader->pWal->vers.firstVer == -1) { terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; return -1; } - if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) { - wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId, - ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.lastVer); + if (ver > pReader->pWal->vers.lastVer || ver < pReader->pWal->vers.firstVer) { + wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId, + ver, pReader->pWal->vers.firstVer, pReader->pWal->vers.lastVer); terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; return -1; } - if (pRead->curInvalid || pRead->curVersion != ver) { - if (walReadSeekVer(pRead, ver) < 0) { - wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, terrstr()); + if (pReader->curInvalid || pReader->curVersion != ver) { + if (walReadSeekVer(pReader, ver) < 0) { + wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr()); return -1; } seeked = true; } while (1) { - contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead)); + contLen = taosReadFile(pReader->pLogFile, pReader->pHead, sizeof(SWalCkHead)); if (contLen == sizeof(SWalCkHead)) { break; } else if (contLen == 0 && !seeked) { - walReadSeekVerImpl(pRead, ver); + walReadSeekVerImpl(pReader, ver); seeked = true; continue; } else { @@ -467,26 +468,26 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { } } - contLen = walValidHeadCksum(pRead->pHead); - if (contLen != 0) { - wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, + code = walValidHeadCksum(pReader->pHead); + if (code != 0) { + wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pReader->pWal->cfg.vgId, ver); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } - if (pRead->capacity < pRead->pHead->head.bodyLen) { - void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pRead->pHead->head.bodyLen); + if (pReader->capacity < pReader->pHead->head.bodyLen) { + void *ptr = taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReader->pHead->head.bodyLen); if (ptr == NULL) { terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; return -1; } - pRead->pHead = ptr; - pRead->capacity = pRead->pHead->head.bodyLen; + pReader->pHead = ptr; + pReader->capacity = pReader->pHead->head.bodyLen; } - if ((contLen = taosReadFile(pRead->pLogFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) != - pRead->pHead->head.bodyLen) { + if ((contLen = taosReadFile(pReader->pLogFile, pReader->pHead->head.body, pReader->pHead->head.bodyLen)) != + pReader->pHead->head.bodyLen) { if (contLen < 0) terrno = TAOS_SYSTEM_ERROR(errno); else { @@ -496,25 +497,28 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { return -1; } - if (pRead->pHead->head.version != ver) { - wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, - pRead->pHead->head.version, ver); - pRead->curInvalid = 1; + if (pReader->pHead->head.version != ver) { + wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pReader->pWal->cfg.vgId, + pReader->pHead->head.version, ver); + pReader->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; ASSERT(0); return -1; } - contLen = walValidBodyCksum(pRead->pHead); - if (contLen != 0) { - wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, + code = walValidBodyCksum(pReader->pHead); + if (code != 0) { + wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId, ver); - pRead->curInvalid = 1; + uint32_t readCkSum = walCalcBodyCksum(pReader->pHead->head.body, pReader->pHead->head.bodyLen); + uint32_t logCkSum = pReader->pHead->cksumBody; + wError("checksum written into log: %u, checksum calculated: %u", logCkSum, readCkSum); + pReader->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; ASSERT(0); return -1; } - pRead->curVersion++; + pReader->curVersion++; return 0; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 491e5b0e08ff190ed9b042170e942738a22e5f36..4eadc92f705da24df20555ffef1bf82d7fca9858 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -289,18 +289,25 @@ int32_t walEndSnapshot(SWal *pWal) { newTotSize -= iter->fileSize; } } - char fnameStr[WAL_FILE_LEN]; + int32_t actualDelete = 0; + char fnameStr[WAL_FILE_LEN]; // remove file for (int i = 0; i < deleteCnt; i++) { pInfo = taosArrayGet(pWal->fileInfoSet, i); walBuildLogName(pWal, pInfo->firstVer, fnameStr); - taosRemoveFile(fnameStr); + if (taosRemoveFile(fnameStr) < 0) { + goto UPDATE_META; + } walBuildIdxName(pWal, pInfo->firstVer, fnameStr); - taosRemoveFile(fnameStr); + if (taosRemoveFile(fnameStr) < 0) { + ASSERT(0); + } + actualDelete++; } + UPDATE_META: // make new array, remove files - taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt); + taosArrayPopFrontBatch(pWal->fileInfoSet, actualDelete); if (taosArrayGetSize(pWal->fileInfoSet) == 0) { pWal->writeCur = -1; pWal->vers.firstVer = -1;