/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "cJSON.h" #include "os.h" #include "taoserror.h" #include "tutil.h" #include "walInt.h" bool FORCE_INLINE walLogExist(SWal* pWal, int64_t ver) { return !walIsEmpty(pWal) && walGetFirstVer(pWal) <= ver && walGetLastVer(pWal) >= ver; } bool FORCE_INLINE walIsEmpty(SWal* pWal) { return pWal->vers.firstVer == -1; } int64_t FORCE_INLINE walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; } int64_t FORCE_INLINE walGetSnaphostVer(SWal* pWal) { return pWal->vers.snapshotVer; } int64_t FORCE_INLINE walGetLastVer(SWal* pWal) { return pWal->vers.lastVer; } int64_t FORCE_INLINE walGetCommittedVer(SWal* pWal) { return pWal->vers.commitVer; } int64_t FORCE_INLINE walGetAppliedVer(SWal* pWal) { return pWal->vers.appliedVer; } static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); } static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { int32_t sz = taosArrayGetSize(pWal->fileInfoSet); ASSERT(sz > 0); #if 0 for (int i = 0; i < sz; i++) { SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, i); } #endif SWalFileInfo* pLastFileInfo = taosArrayGet(pWal->fileInfoSet, sz - 1); char fnameStr[WAL_FILE_LEN]; walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr); int64_t fileSize = 0; taosStatFile(fnameStr, &fileSize, NULL); int32_t readSize = TMIN(WAL_SCAN_BUF_SIZE, fileSize); pLastFileInfo->fileSize = fileSize; TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ); if (pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } uint64_t magic = WAL_MAGIC; char* buf = taosMemoryMalloc(readSize + 5); if (buf == NULL) { taosCloseFile(&pFile); terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; return -1; } int64_t offset; offset = taosLSeekFile(pFile, -readSize, SEEK_END); if (readSize != taosReadFile(pFile, buf, readSize)) { taosMemoryFree(buf); taosCloseFile(&pFile); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } char* found = NULL; while (1) { char* haystack = buf; char* candidate; while ((candidate = tmemmem(haystack, readSize - (haystack - buf), (char*)&magic, sizeof(uint64_t))) != NULL) { // read and validate SWalCkHead* logContent = (SWalCkHead*)candidate; if (walValidHeadCksum(logContent) == 0 && walValidBodyCksum(logContent) == 0) { found = candidate; } haystack = candidate + 1; } if (found || offset == 0) break; offset = TMIN(0, offset - readSize + sizeof(uint64_t)); int64_t offset2 = taosLSeekFile(pFile, offset, SEEK_SET); ASSERT(offset == offset2); if (readSize != taosReadFile(pFile, buf, readSize)) { taosMemoryFree(buf); taosCloseFile(&pFile); terrno = TAOS_SYSTEM_ERROR(errno); return -1; } #if 0 if (found == buf) { SWalCkHead* logContent = (SWalCkHead*)found; if (walValidHeadCksum(logContent) != 0 || walValidBodyCksum(logContent) != 0) { // file has to be deleted taosMemoryFree(buf); taosCloseFile(&pFile); terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } } #endif } if (found == NULL) { // file corrupted, no complete log // TODO delete and search in previous files /*ASSERT(0);*/ terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } // truncate file SWalCkHead* lastEntry = (SWalCkHead*)found; int64_t retVer = lastEntry->head.version; int64_t lastEntryBeginOffset = offset + (int64_t)((char*)found - (char*)buf); int64_t lastEntryEndOffset = lastEntryBeginOffset + sizeof(SWalCkHead) + lastEntry->head.bodyLen; if (lastEntryEndOffset != fileSize) { wWarn("vgId:%d repair meta truncate file %s to %ld, orig size %ld", pWal->cfg.vgId, fnameStr, lastEntryEndOffset, fileSize); taosFtruncateFile(pFile, lastEntryEndOffset); ((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->fileSize = lastEntryEndOffset; pWal->totSize -= (fileSize - lastEntryEndOffset); } taosCloseFile(&pFile); taosMemoryFree(buf); return retVer; } int walCheckAndRepairMeta(SWal* pWal) { // load log files, get first/snapshot/last version info const char* logPattern = "^[0-9]+.log$"; const char* idxPattern = "^[0-9]+.idx$"; regex_t logRegPattern; regex_t idxRegPattern; SArray* actualLog = taosArrayInit(8, sizeof(SWalFileInfo)); regcomp(&logRegPattern, logPattern, REG_EXTENDED); regcomp(&idxRegPattern, idxPattern, REG_EXTENDED); TdDirPtr pDir = taosOpenDir(pWal->path); if (pDir == NULL) { regfree(&logRegPattern); regfree(&idxRegPattern); wError("vgId:%d, path:%s, failed to open since %s", pWal->cfg.vgId, pWal->path, strerror(errno)); return -1; } // scan log files and build new meta TdDirEntryPtr pDirEntry; while ((pDirEntry = taosReadDir(pDir)) != NULL) { char* name = taosDirEntryBaseName(taosGetDirEntryName(pDirEntry)); int code = regexec(&logRegPattern, name, 0, NULL, 0); if (code == 0) { SWalFileInfo fileInfo; memset(&fileInfo, -1, sizeof(SWalFileInfo)); sscanf(name, "%" PRId64 ".log", &fileInfo.firstVer); taosArrayPush(actualLog, &fileInfo); } } taosCloseDir(&pDir); regfree(&logRegPattern); regfree(&idxRegPattern); taosArraySort(actualLog, compareWalFileInfo); int metaFileNum = taosArrayGetSize(pWal->fileInfoSet); int actualFileNum = taosArrayGetSize(actualLog); #if 0 for (int32_t fileNo = actualFileNum - 1; fileNo >= 0; fileNo--) { SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, fileNo); char fnameStr[WAL_FILE_LEN]; walBuildLogName(pWal, pFileInfo->firstVer, fnameStr); int64_t fileSize = 0; taosStatFile(fnameStr, &fileSize, NULL); if (fileSize == 0) { taosRemoveFile(fnameStr); walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr); taosRemoveFile(fnameStr); taosArrayPop(pLogInfoArray); } else { break; } } actualFileNum = taosArrayGetSize(pLogInfoArray); #endif if (metaFileNum > actualFileNum) { taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum); } else if (metaFileNum < actualFileNum) { for (int i = metaFileNum; i < actualFileNum; i++) { SWalFileInfo* pFileInfo = taosArrayGet(actualLog, i); taosArrayPush(pWal->fileInfoSet, pFileInfo); } } taosArrayDestroy(actualLog); pWal->writeCur = actualFileNum - 1; if (actualFileNum > 0) { pWal->vers.firstVer = ((SWalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; SWalFileInfo* pLastFileInfo = taosArrayGet(pWal->fileInfoSet, actualFileNum - 1); char fnameStr[WAL_FILE_LEN]; walBuildLogName(pWal, pLastFileInfo->firstVer, fnameStr); int64_t fileSize = 0; taosStatFile(fnameStr, &fileSize, NULL); /*ASSERT(fileSize != 0);*/ if (metaFileNum != actualFileNum || pLastFileInfo->fileSize != fileSize) { pLastFileInfo->fileSize = fileSize; pWal->vers.lastVer = walScanLogGetLastVer(pWal); ((SWalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = pWal->vers.lastVer; ASSERT(pWal->vers.lastVer != -1); int code = walSaveMeta(pWal); if (code < 0) { return -1; } } } return 0; } int walCheckAndRepairIdx(SWal* pWal) { int32_t sz = taosArrayGetSize(pWal->fileInfoSet); for (int32_t i = 0; i < sz; i++) { SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, i); char fnameStr[WAL_FILE_LEN]; walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr); int64_t fsize; TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE); if (pIdxFile == NULL) { ASSERT(0); terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, cannot open file %s, since %s", pWal->cfg.vgId, fnameStr, terrstr()); return -1; } taosFStatFile(pIdxFile, &fsize, NULL); if (fsize == (pFileInfo->lastVer - pFileInfo->firstVer + 1) * sizeof(SWalIdxEntry)) { taosCloseFile(&pIdxFile); continue; } int32_t left = fsize % sizeof(SWalIdxEntry); int64_t offset = taosLSeekFile(pIdxFile, -left, SEEK_END); if (left != 0) { taosFtruncateFile(pIdxFile, offset); wWarn("vgId:%d wal truncate file %s to offset %ld since size invalid, file size %ld", pWal->cfg.vgId, fnameStr, offset, fsize); } offset -= sizeof(SWalIdxEntry); SWalIdxEntry idxEntry = {.ver = pFileInfo->firstVer}; while (1) { if (offset < 0) { taosLSeekFile(pIdxFile, 0, SEEK_SET); taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)); break; } taosLSeekFile(pIdxFile, offset, SEEK_SET); int64_t contLen = taosReadFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)); if (contLen < 0 || contLen != sizeof(SWalIdxEntry)) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } if ((idxEntry.ver - pFileInfo->firstVer) * sizeof(SWalIdxEntry) != offset) { taosFtruncateFile(pIdxFile, offset); wWarn("vgId:%d wal truncate file %s to offset %ld since entry invalid, entry ver %ld, entry offset %ld", pWal->cfg.vgId, fnameStr, offset, idxEntry.ver, idxEntry.offset); offset -= sizeof(SWalIdxEntry); } else { break; } } if (idxEntry.ver < pFileInfo->lastVer) { char fLogNameStr[WAL_FILE_LEN]; walBuildLogName(pWal, pFileInfo->firstVer, fLogNameStr); TdFilePtr pLogFile = taosOpenFile(fLogNameStr, TD_FILE_READ); if (pLogFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, cannot open file %s, since %s", pWal->cfg.vgId, fLogNameStr, terrstr()); return -1; } while (idxEntry.ver < pFileInfo->lastVer) { if (taosLSeekFile(pLogFile, idxEntry.offset, SEEK_SET) == -1) { terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, cannot seek file %s at %ld, since %s", pWal->cfg.vgId, fLogNameStr, idxEntry.offset, terrstr()); return -1; } SWalCkHead ckHead; taosReadFile(pLogFile, &ckHead, sizeof(SWalCkHead)); if (idxEntry.ver != ckHead.head.version) { // todo truncate this idx also taosCloseFile(&pLogFile); wError("vgId:%d, invalid repair case, log seek to %ld to find ver %ld, actual ver %ld", pWal->cfg.vgId, idxEntry.offset, idxEntry.ver, ckHead.head.version); return -1; } idxEntry.ver = ckHead.head.version + 1; idxEntry.offset = idxEntry.offset + sizeof(SWalCkHead) + ckHead.head.bodyLen; wWarn("vgId:%d wal idx append new entry %ld %ld", pWal->cfg.vgId, idxEntry.ver, idxEntry.offset); taosWriteFile(pIdxFile, &idxEntry, sizeof(SWalIdxEntry)); } taosCloseFile(&pLogFile); } taosCloseFile(&pIdxFile); } return 0; } int walRollFileInfo(SWal* pWal) { int64_t ts = taosGetTimestampSec(); SArray* pArray = pWal->fileInfoSet; if (taosArrayGetSize(pArray) != 0) { SWalFileInfo* pInfo = taosArrayGetLast(pArray); pInfo->lastVer = pWal->vers.lastVer; pInfo->closeTs = ts; } // TODO: change to emplace back SWalFileInfo* pNewInfo = taosMemoryMalloc(sizeof(SWalFileInfo)); if (pNewInfo == NULL) { terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; return -1; } pNewInfo->firstVer = pWal->vers.lastVer + 1; pNewInfo->lastVer = -1; pNewInfo->createTs = ts; pNewInfo->closeTs = -1; pNewInfo->fileSize = 0; taosArrayPush(pArray, pNewInfo); taosMemoryFree(pNewInfo); return 0; } char* walMetaSerialize(SWal* pWal) { char buf[30]; ASSERT(pWal->fileInfoSet); int sz = taosArrayGetSize(pWal->fileInfoSet); cJSON* pRoot = cJSON_CreateObject(); cJSON* pMeta = cJSON_CreateObject(); cJSON* pFiles = cJSON_CreateArray(); cJSON* pField; if (pRoot == NULL || pMeta == NULL || pFiles == NULL) { if (pRoot) { cJSON_Delete(pRoot); } if (pMeta) { cJSON_Delete(pMeta); } if (pFiles) { cJSON_Delete(pFiles); } terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; return NULL; } cJSON_AddItemToObject(pRoot, "meta", pMeta); sprintf(buf, "%" PRId64, pWal->vers.firstVer); cJSON_AddStringToObject(pMeta, "firstVer", buf); sprintf(buf, "%" PRId64, pWal->vers.snapshotVer); cJSON_AddStringToObject(pMeta, "snapshotVer", buf); sprintf(buf, "%" PRId64, pWal->vers.commitVer); cJSON_AddStringToObject(pMeta, "commitVer", buf); sprintf(buf, "%" PRId64, pWal->vers.lastVer); cJSON_AddStringToObject(pMeta, "lastVer", buf); cJSON_AddItemToObject(pRoot, "files", pFiles); SWalFileInfo* pData = pWal->fileInfoSet->pData; for (int i = 0; i < sz; i++) { SWalFileInfo* pInfo = &pData[i]; cJSON_AddItemToArray(pFiles, pField = cJSON_CreateObject()); if (pField == NULL) { cJSON_Delete(pRoot); return NULL; } // cjson only support int32_t or double // string are used to prohibit the loss of precision sprintf(buf, "%" PRId64, pInfo->firstVer); cJSON_AddStringToObject(pField, "firstVer", buf); sprintf(buf, "%" PRId64, pInfo->lastVer); cJSON_AddStringToObject(pField, "lastVer", buf); sprintf(buf, "%" PRId64, pInfo->createTs); cJSON_AddStringToObject(pField, "createTs", buf); sprintf(buf, "%" PRId64, pInfo->closeTs); cJSON_AddStringToObject(pField, "closeTs", buf); sprintf(buf, "%" PRId64, pInfo->fileSize); cJSON_AddStringToObject(pField, "fileSize", buf); } char* serialized = cJSON_Print(pRoot); cJSON_Delete(pRoot); return serialized; } int walMetaDeserialize(SWal* pWal, const char* bytes) { ASSERT(taosArrayGetSize(pWal->fileInfoSet) == 0); cJSON *pRoot, *pMeta, *pFiles, *pInfoJson, *pField; pRoot = cJSON_Parse(bytes); pMeta = cJSON_GetObjectItem(pRoot, "meta"); pField = cJSON_GetObjectItem(pMeta, "firstVer"); pWal->vers.firstVer = atoll(cJSON_GetStringValue(pField)); pField = cJSON_GetObjectItem(pMeta, "snapshotVer"); pWal->vers.snapshotVer = atoll(cJSON_GetStringValue(pField)); pField = cJSON_GetObjectItem(pMeta, "commitVer"); pWal->vers.commitVer = atoll(cJSON_GetStringValue(pField)); pField = cJSON_GetObjectItem(pMeta, "lastVer"); pWal->vers.lastVer = atoll(cJSON_GetStringValue(pField)); pFiles = cJSON_GetObjectItem(pRoot, "files"); int sz = cJSON_GetArraySize(pFiles); // deserialize SArray* pArray = pWal->fileInfoSet; taosArrayEnsureCap(pArray, sz); SWalFileInfo* pData = pArray->pData; for (int i = 0; i < sz; i++) { cJSON* pInfoJson = cJSON_GetArrayItem(pFiles, i); SWalFileInfo* pInfo = &pData[i]; pField = cJSON_GetObjectItem(pInfoJson, "firstVer"); pInfo->firstVer = atoll(cJSON_GetStringValue(pField)); pField = cJSON_GetObjectItem(pInfoJson, "lastVer"); pInfo->lastVer = atoll(cJSON_GetStringValue(pField)); pField = cJSON_GetObjectItem(pInfoJson, "createTs"); pInfo->createTs = atoll(cJSON_GetStringValue(pField)); pField = cJSON_GetObjectItem(pInfoJson, "closeTs"); pInfo->closeTs = atoll(cJSON_GetStringValue(pField)); pField = cJSON_GetObjectItem(pInfoJson, "fileSize"); pInfo->fileSize = atoll(cJSON_GetStringValue(pField)); } taosArraySetSize(pArray, sz); pWal->fileInfoSet = pArray; pWal->writeCur = sz - 1; cJSON_Delete(pRoot); return 0; } static int walFindCurMetaVer(SWal* pWal) { const char* pattern = "^meta-ver[0-9]+$"; regex_t walMetaRegexPattern; regcomp(&walMetaRegexPattern, pattern, REG_EXTENDED); TdDirPtr pDir = taosOpenDir(pWal->path); if (pDir == NULL) { wError("vgId:%d, path:%s, failed to open since %s", pWal->cfg.vgId, pWal->path, strerror(errno)); return -1; } TdDirEntryPtr pDirEntry; // find existing meta-ver[x].json int metaVer = -1; while ((pDirEntry = taosReadDir(pDir)) != NULL) { char* name = taosDirEntryBaseName(taosGetDirEntryName(pDirEntry)); int code = regexec(&walMetaRegexPattern, name, 0, NULL, 0); if (code == 0) { sscanf(name, "meta-ver%d", &metaVer); wDebug("vgId:%d, wal find current meta: %s is the meta file, ver %d", pWal->cfg.vgId, name, metaVer); break; } wDebug("vgId:%d, wal find current meta: %s is not meta file", pWal->cfg.vgId, name); } taosCloseDir(&pDir); regfree(&walMetaRegexPattern); return metaVer; } int walSaveMeta(SWal* pWal) { int metaVer = walFindCurMetaVer(pWal); char fnameStr[WAL_FILE_LEN]; walBuildMetaName(pWal, metaVer + 1, fnameStr); TdFilePtr pMataFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE); if (pMataFile == NULL) { return -1; } char* serialized = walMetaSerialize(pWal); int len = strlen(serialized); if (len != taosWriteFile(pMataFile, serialized, len)) { // TODO:clean file return -1; } taosCloseFile(&pMataFile); // delete old file if (metaVer > -1) { walBuildMetaName(pWal, metaVer, fnameStr); taosRemoveFile(fnameStr); } taosMemoryFree(serialized); return 0; } int walLoadMeta(SWal* pWal) { ASSERT(pWal->fileInfoSet->size == 0); // find existing meta file int metaVer = walFindCurMetaVer(pWal); if (metaVer == -1) { wDebug("vgId:%d wal find meta ver %d", pWal->cfg.vgId, metaVer); return -1; } char fnameStr[WAL_FILE_LEN]; walBuildMetaName(pWal, metaVer, fnameStr); // read metafile int64_t fileSize = 0; taosStatFile(fnameStr, &fileSize, NULL); int size = (int)fileSize; char* buf = taosMemoryMalloc(size + 5); if (buf == NULL) { terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; return -1; } memset(buf, 0, size + 5); TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ); if (pFile == NULL) { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } if (taosReadFile(pFile, buf, size) != size) { terrno = TAOS_SYSTEM_ERROR(errno); taosCloseFile(&pFile); taosMemoryFree(buf); return -1; } // load into fileInfoSet int code = walMetaDeserialize(pWal, buf); taosCloseFile(&pFile); taosMemoryFree(buf); return code; } int walRemoveMeta(SWal* pWal) { int metaVer = walFindCurMetaVer(pWal); if (metaVer == -1) return 0; char fnameStr[WAL_FILE_LEN]; walBuildMetaName(pWal, metaVer, fnameStr); taosRemoveFile(fnameStr); return 0; }