diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index b514648bbd32a01c5e3cd234728e2e9575da725e..a72765583efd58bc5c371a74d801e23a524c7d5d 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -55,12 +55,14 @@ typedef struct { uint32_t signature; uint32_t cksumHead; uint32_t cksumBody; - //char cont[]; + char cont[]; } SWalHead; typedef struct { int32_t vgId; int32_t fsyncPeriod; // millisecond + int32_t rollPeriod; + int64_t segSize; EWalType walLevel; // wal level } SWalCfg; @@ -87,10 +89,14 @@ typedef struct SWal { // cfg int32_t vgId; int32_t fsyncPeriod; // millisecond - int32_t fsyncSeq; int32_t rollPeriod; // second int64_t segSize; + int64_t rtSize; EWalType level; + //total size + int64_t totSize; + //fsync seq + int32_t fsyncSeq; //reference int64_t refId; //current tfd @@ -98,25 +104,32 @@ typedef struct SWal { int64_t curIdxTfd; //current version int64_t curVersion; - int64_t curLogOffset; + //current file version - int64_t curFileFirstVersion; - int64_t curFileLastVersion; - //wal fileset version + //int64_t curFileFirstVersion; + //int64_t curFileLastVersion; + + //wal lifecycle int64_t firstVersion; int64_t snapshotVersion; + int64_t commitVersion; int64_t lastVersion; - int64_t lastFileName; + + //last file + //int64_t lastFileName; + //roll status int64_t lastRollSeq; - int64_t lastFileWriteSize; + //int64_t lastFileWriteSize; + + //file set + int32_t fileCursor; + SArray* fileInfoSet; //ctl int32_t curStatus; pthread_mutex_t mutex; //path char path[WAL_PATH_LEN]; - //file set - SArray* fileSet; //reusable write head SWalHead head; } SWal; // WAL HANDLE @@ -133,7 +146,7 @@ int32_t walAlter(SWal *, SWalCfg *pCfg); void walClose(SWal *); // write -int64_t walWrite(SWal *, int64_t index, uint8_t msgType, void *body, int32_t bodyLen); +int64_t walWrite(SWal *, int64_t index, uint8_t msgType, const void *body, int32_t bodyLen); void walFsync(SWal *, bool force); // apis for lifecycle management diff --git a/include/util/tfile.h b/include/util/tfile.h index 3d0e2177ac123cd0600498806fa3ab5205485ee4..af4c19e7d19ebcd6fb9d24f435bb51073cd9836b 100644 --- a/include/util/tfile.h +++ b/include/util/tfile.h @@ -16,6 +16,8 @@ #ifndef _TD_UTIL_FILE_H #define _TD_UTIL_FILE_H +#include "os.h" + #ifdef __cplusplus extern "C" { #endif diff --git a/source/libs/wal/CMakeLists.txt b/source/libs/wal/CMakeLists.txt index e5697415f17acb5358a85e0540462ea4c7871640..4d2dd97c8756f2633565e2d92a49738989e4ba8f 100644 --- a/source/libs/wal/CMakeLists.txt +++ b/source/libs/wal/CMakeLists.txt @@ -8,6 +8,11 @@ target_include_directories( target_link_libraries( wal + PUBLIC cjson PUBLIC os PUBLIC util ) + +if(${BUILD_TEST}) + add_subdirectory(test) +endif(${BUILD_TEST}) diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index 285d7e25762ba3e6a6912e8a542b9f3afaab892c..ae655d61da7fe9b9be42a9fbbba7a07f7d747435 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -23,9 +23,73 @@ extern "C" { #endif -int walGetFile(SWal* pWal, int32_t version); +//meta section begin +typedef struct WalFileInfo { + int64_t firstVer; + int64_t lastVer; + int64_t createTs; + int64_t closeTs; + int64_t fileSize; +} WalFileInfo; + +static inline int32_t compareWalFileInfo(const void* pLeft, const void* pRight) { + WalFileInfo* pInfoLeft = (WalFileInfo*)pLeft; + WalFileInfo* pInfoRight = (WalFileInfo*)pRight; + return compareInt64Val(&pInfoLeft->firstVer, &pInfoRight->firstVer); +} + +static inline int64_t walGetLastFileSize(SWal* pWal) { + WalFileInfo* pInfo = (WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); + return pInfo->fileSize; +} + +static inline int64_t walGetLastFileFirstVer(SWal* pWal) { + WalFileInfo* pInfo = (WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); + return pInfo->firstVer; +} + +static inline int64_t walGetCurFileFirstVer(SWal* pWal) { + WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor); + return pInfo->firstVer; +} + +static inline int64_t walGetCurFileLastVer(SWal* pWal) { + WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor); + return pInfo->firstVer; +} + +static inline int64_t walGetCurFileOffset(SWal* pWal) { + WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor); + return pInfo->fileSize; +} + +static inline bool walCurFileClosed(SWal* pWal) { + return taosArrayGetSize(pWal->fileInfoSet) != pWal->fileCursor; +} + +static inline WalFileInfo* walGetCurFileInfo(SWal* pWal) { + return (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor); +} + +static inline int walBuildLogName(SWal*pWal, int64_t fileFirstVer, char* buf) { + return sprintf(buf, "%s/%" PRId64 "." WAL_LOG_SUFFIX, pWal->path, fileFirstVer); +} + +static inline int walBuildIdxName(SWal*pWal, int64_t fileFirstVer, char* buf) { + return sprintf(buf, "%s/%" PRId64 "." WAL_INDEX_SUFFIX, pWal->path, fileFirstVer); +} + +int walReadMeta(SWal* pWal); +int walWriteMeta(SWal* pWal); +int walRollFileInfo(SWal* pWal); + +char* walFileInfoSerialize(SWal* pWal); +SArray* walFileInfoDeserialize(const char* bytes); +//meta section end int64_t walGetSeq(); +int walSeekVer(SWal *pWal, int64_t ver); +int walRoll(SWal *pWal); #ifdef __cplusplus } diff --git a/source/libs/wal/src/walIndex.c b/source/libs/wal/src/walIndex.c index 1aa64b34b5e45958218faa43beb0499f01113414..b4d66226d6b279212a3fbc5f7ce19209c27e4262 100644 --- a/source/libs/wal/src/walIndex.c +++ b/source/libs/wal/src/walIndex.c @@ -27,7 +27,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) { int64_t logTfd = pWal->curLogTfd; //seek position - int64_t offset = (ver - pWal->curFileFirstVersion) * WAL_IDX_ENTRY_SIZE; + int64_t offset = (ver - walGetCurFileFirstVer(pWal)) * WAL_IDX_ENTRY_SIZE; code = tfLseek(idxTfd, offset, SEEK_SET); if(code != 0) { @@ -43,7 +43,7 @@ static int walSeekFilePos(SWal* pWal, int64_t ver) { if (code != 0) { } - pWal->curLogOffset = readBuf[1]; + /*pWal->curLogOffset = readBuf[1];*/ pWal->curVersion = ver; return code; } @@ -60,27 +60,27 @@ static int walChangeFile(SWal *pWal, int64_t ver) { if(code != 0) { //TODO } + WalFileInfo tmpInfo; + tmpInfo.firstVer = ver; //bsearch in fileSet - int64_t* pRet = taosArraySearch(pWal->fileSet, &ver, compareInt64Val, TD_LE); + WalFileInfo* pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); ASSERT(pRet != NULL); - int64_t fname = *pRet; - if(fname < pWal->lastFileName) { + int64_t fileFirstVer = pRet->firstVer; + //closed + if(taosArrayGetLast(pWal->fileInfoSet) != pRet) { pWal->curStatus &= ~WAL_CUR_FILE_WRITABLE; - pWal->curFileLastVersion = pRet[1]-1; - sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname); + walBuildIdxName(pWal, fileFirstVer, fnameStr); idxTfd = tfOpenRead(fnameStr); - sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname); + walBuildLogName(pWal, fileFirstVer, fnameStr); logTfd = tfOpenRead(fnameStr); } else { pWal->curStatus |= WAL_CUR_FILE_WRITABLE; - pWal->curFileLastVersion = -1; - sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname); + walBuildIdxName(pWal, fileFirstVer, fnameStr); idxTfd = tfOpenReadWrite(fnameStr); - sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname); + walBuildLogName(pWal, fileFirstVer, fnameStr); logTfd = tfOpenReadWrite(fnameStr); } - pWal->curFileFirstVersion = fname; pWal->curLogTfd = logTfd; pWal->curIdxTfd = idxTfd; return code; @@ -102,8 +102,7 @@ int walSeekVer(SWal *pWal, int64_t ver) { if(ver < pWal->snapshotVersion) { //TODO: seek snapshotted log, invalid in some cases } - if(ver < pWal->curFileFirstVersion || - (pWal->curFileLastVersion != -1 && ver > pWal->curFileLastVersion)) { + if(ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) { walChangeFile(pWal, ver); } walSeekFilePos(pWal, ver); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c new file mode 100644 index 0000000000000000000000000000000000000000..2eec4328e605154aa47b45e8b13c71857ec201b4 --- /dev/null +++ b/source/libs/wal/src/walMeta.c @@ -0,0 +1,193 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "os.h" +#include "taoserror.h" +#include "tref.h" +#include "tfile.h" +#include "cJSON.h" +#include "walInt.h" + +#include +#include + +int walRollFileInfo(SWal* pWal) { + int64_t ts = taosGetTimestampSec(); + + SArray* pArray = pWal->fileInfoSet; + if(taosArrayGetSize(pArray) != 0) { + WalFileInfo *pInfo = taosArrayGetLast(pArray); + pInfo->lastVer = pWal->lastVersion; + pInfo->closeTs = ts; + } + + WalFileInfo *pNewInfo = malloc(sizeof(WalFileInfo)); + if(pNewInfo == NULL) { + return -1; + } + pNewInfo->firstVer = pWal->lastVersion + 1; + pNewInfo->lastVer = -1; + pNewInfo->createTs = ts; + pNewInfo->closeTs = -1; + pNewInfo->fileSize = 0; + taosArrayPush(pWal->fileInfoSet, pNewInfo); + return 0; +} + +char* walFileInfoSerialize(SWal* pWal) { + char buf[30]; + if(pWal == NULL || pWal->fileInfoSet == NULL) return 0; + int sz = pWal->fileInfoSet->size; + cJSON* root = cJSON_CreateArray(); + cJSON* field; + if(root == NULL) { + //TODO + return NULL; + } + WalFileInfo* pData = pWal->fileInfoSet->pData; + for(int i = 0; i < sz; i++) { + WalFileInfo* pInfo = &pData[i]; + cJSON_AddItemToArray(root, field = cJSON_CreateObject()); + if(field == NULL) { + cJSON_Delete(root); + return NULL; + } + //cjson only support int32_t or double + //string are used to prohibit the loss of precision + sprintf(buf, "%ld", pInfo->firstVer); + cJSON_AddStringToObject(field, "firstVer", buf); + sprintf(buf, "%ld", pInfo->lastVer); + cJSON_AddStringToObject(field, "lastVer", buf); + sprintf(buf, "%ld", pInfo->createTs); + cJSON_AddStringToObject(field, "createTs", buf); + sprintf(buf, "%ld", pInfo->closeTs); + cJSON_AddStringToObject(field, "closeTs", buf); + sprintf(buf, "%ld", pInfo->fileSize); + cJSON_AddStringToObject(field, "fileSize", buf); + } + return cJSON_Print(root); +} + +SArray* walFileInfoDeserialize(const char* bytes) { + cJSON *root, *pInfoJson, *pField; + root = cJSON_Parse(bytes); + int sz = cJSON_GetArraySize(root); + //deserialize + SArray* pArray = taosArrayInit(sz, sizeof(WalFileInfo)); + WalFileInfo *pData = pArray->pData; + for(int i = 0; i < sz; i++) { + cJSON* pInfoJson = cJSON_GetArrayItem(root, i); + WalFileInfo* pInfo = &pData[i]; + pField = cJSON_GetObjectItem(pInfoJson, "firstVer"); + pInfo->firstVer = atoll(cJSON_GetStringValue(pField)); + pField = cJSON_GetObjectItem(pInfoJson, "lastVer"); + pInfo->lastVer = atoll(cJSON_GetStringValue(pField)); + pField = cJSON_GetObjectItem(pInfoJson, "createTs"); + pInfo->createTs = atoll(cJSON_GetStringValue(pField)); + pField = cJSON_GetObjectItem(pInfoJson, "closeTs"); + pInfo->closeTs = atoll(cJSON_GetStringValue(pField)); + pField = cJSON_GetObjectItem(pInfoJson, "fileSize"); + pInfo->fileSize = atoll(cJSON_GetStringValue(pField)); + } + taosArraySetSize(pArray, sz); + return pArray; +} + +static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { + return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); +} + +static int walFindCurMetaVer(SWal* pWal) { + const char * pattern = "^meta-ver[0-9]+$"; + regex_t walMetaRegexPattern; + regcomp(&walMetaRegexPattern, pattern, REG_EXTENDED); + + DIR *dir = opendir(pWal->path); + if(dir == NULL) { + wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); + return -1; + } + + struct dirent* ent; + + //find existing meta-ver[x].json + int metaVer = -1; + while((ent = readdir(dir)) != NULL) { + char *name = basename(ent->d_name); + int code = regexec(&walMetaRegexPattern, name, 0, NULL, 0); + if(code == 0) { + sscanf(name, "meta-ver%d", &metaVer); + break; + } + } + return metaVer; +} + +int walWriteMeta(SWal* pWal) { + int metaVer = walFindCurMetaVer(pWal); + char fnameStr[WAL_FILE_LEN]; + walBuildMetaName(pWal, metaVer+1, fnameStr); + int metaTfd = tfOpenCreateWrite(fnameStr); + if(metaTfd < 0) { + return -1; + } + char* serialized = walFileInfoSerialize(pWal); + int len = strlen(serialized); + if(len != tfWrite(metaTfd, serialized, len)) { + //TODO:clean file + return -1; + } + + tfClose(metaTfd); + //delete old file + if(metaVer > -1) { + walBuildMetaName(pWal, metaVer, fnameStr); + remove(fnameStr); + } + return 0; +} + +int walReadMeta(SWal* pWal) { + ASSERT(pWal->fileInfoSet->size == 0); + //find existing meta file + int metaVer = walFindCurMetaVer(pWal); + if(metaVer == -1) { + return 0; + } + char fnameStr[WAL_FILE_LEN]; + walBuildMetaName(pWal, metaVer, fnameStr); + //read metafile + struct stat statbuf; + stat(fnameStr, &statbuf); + int size = statbuf.st_size; + char* buf = malloc(size + 5); + if(buf == NULL) { + return -1; + } + int tfd = tfOpenRead(fnameStr); + if(tfRead(tfd, buf, size) != size) { + free(buf); + return -1; + } + //load into fileInfoSet + pWal->fileInfoSet = walFileInfoDeserialize(buf); + if(pWal->fileInfoSet == NULL) { + free(buf); + return -1; + } + free(buf); + return 0; +} diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index bc2e687069868109a96488ac7c4a51d57c0bfdc0..acb173b17bdd79c78e0e2d04af6b70014ac60c52 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -64,43 +64,31 @@ int32_t walInit() { void walCleanUp() { walStopThread(); taosCloseRef(tsWal.refSetId); + atomic_store_8(&tsWal.inited, 0); wInfo("wal module is cleaned up"); } -static int walLoadFileset(SWal *pWal) { - DIR *dir = opendir(pWal->path); - if (dir == NULL) { - wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); - return -1; - } - - struct dirent* ent; - while ((ent = readdir(dir)) != NULL) { - char *name = ent->d_name; - name[WAL_NOSUFFIX_LEN] = 0; - //validate file name by regex matching - if(1 /* TODO:regex match */) { - int64_t fnameInt64 = atoll(name); - taosArrayPush(pWal->fileSet, &fnameInt64); - } - } - taosArraySort(pWal->fileSet, compareInt64Val); - return 0; -} - SWal *walOpen(const char *path, SWalCfg *pCfg) { SWal *pWal = malloc(sizeof(SWal)); if (pWal == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } - - pWal->vgId = pCfg->vgId; pWal->curLogTfd = -1; pWal->curIdxTfd = -1; - pWal->level = pCfg->walLevel; + + //set config + pWal->vgId = pCfg->vgId; pWal->fsyncPeriod = pCfg->fsyncPeriod; + pWal->rollPeriod = pCfg->rollPeriod; + pWal->segSize = pCfg->segSize; + pWal->level = pCfg->walLevel; + + //init status + pWal->lastVersion = -1; + pWal->lastRollSeq = -1; + //init write buffer memset(&pWal->head, 0, sizeof(SWalHead)); pWal->head.sver = 0; @@ -120,7 +108,6 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { walFreeObj(pWal); return NULL; } - walLoadFileset(pWal); wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod); @@ -153,8 +140,8 @@ void walClose(SWal *pWal) { pthread_mutex_lock(&pWal->mutex); tfClose(pWal->curLogTfd); tfClose(pWal->curIdxTfd); - taosArrayDestroy(pWal->fileSet); - pWal->fileSet = NULL; + /*taosArrayDestroy(pWal->fileInfoSet);*/ + /*pWal->fileInfoSet = NULL;*/ pthread_mutex_unlock(&pWal->mutex); taosRemoveRef(tsWal.refSetId, pWal->refId); } @@ -164,8 +151,8 @@ static int32_t walInitObj(SWal *pWal) { wError("vgId:%d, path:%s, failed to create directory since %s", pWal->vgId, pWal->path, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } - pWal->fileSet = taosArrayInit(0, sizeof(int64_t)); - if(pWal->fileSet == NULL) { + pWal->fileInfoSet = taosArrayInit(0, sizeof(WalFileInfo)); + if(pWal->fileInfoSet == NULL) { wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->vgId, pWal->path, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } @@ -180,8 +167,10 @@ static void walFreeObj(void *wal) { tfClose(pWal->curLogTfd); tfClose(pWal->curIdxTfd); - taosArrayDestroy(pWal->fileSet); - pWal->fileSet = NULL; + taosArrayDestroy(pWal->fileInfoSet); + pWal->fileInfoSet = NULL; + taosArrayDestroy(pWal->fileInfoSet); + pWal->fileInfoSet = NULL; pthread_mutex_destroy(&pWal->mutex); tfree(pWal); } @@ -210,7 +199,7 @@ static void walFsyncAll() { wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq)); int32_t code = tfFsync(pWal->curLogTfd); if (code != 0) { - wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(code)); + wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(code)); } } pWal = taosIterateRef(tsWal.refSetId, pWal->refId); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index b475183b7b271693865c5fa7a496608fd9b547c6..90ec5528c47298551a0e88cb8ceed847a5fbd02a 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -13,16 +13,56 @@ * along with this program. If not, see . */ -#include "wal.h" +#include "walInt.h" +#include "tfile.h" #include "tchecksum.h" -static int walValidateChecksum(SWalHead *pHead, void* body, int64_t bodyLen) { - return taosCheckChecksum((uint8_t*)pHead, sizeof(SWalHead) - sizeof(uint32_t)*2, pHead->cksumHead) && - taosCheckChecksum(body, bodyLen, pHead->cksumBody); +static inline int walValidHeadCksum(SWalHead* pHead) { + return taosCheckChecksum((uint8_t*)pHead, sizeof(SWalHead) - sizeof(uint32_t)*2, pHead->cksumHead); } +static inline int walValidBodyCksum(SWalHead* pHead) { + return taosCheckChecksum((uint8_t*)pHead->cont, pHead->len, pHead->cksumBody); +} + +static int walValidCksum(SWalHead *pHead, void* body, int64_t bodyLen) { + return walValidHeadCksum(pHead) && walValidBodyCksum(pHead); +} int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { + int code; + code = walSeekVer(pWal, ver); + if(code != 0) { + return code; + } + if(*ppHead == NULL) { + void* ptr = realloc(*ppHead, sizeof(SWalHead)); + if(ptr == NULL) { + return -1; + } + *ppHead = ptr; + } + if(tfRead(pWal->curLogTfd, *ppHead, sizeof(SWalHead)) != sizeof(SWalHead)) { + return -1; + } + //TODO: endian compatibility processing after read + if(walValidHeadCksum(*ppHead) != 0) { + return -1; + } + void* ptr = realloc(*ppHead, sizeof(SWalHead) + (*ppHead)->len); + if(ptr == NULL) { + free(*ppHead); + *ppHead = NULL; + return -1; + } + if(tfRead(pWal->curLogTfd, (*ppHead)->cont, (*ppHead)->len) != (*ppHead)->len) { + return -1; + } + //TODO: endian compatibility processing after read + if(walValidBodyCksum(*ppHead) != 0) { + return -1; + } + return 0; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 69c83a9912addb6b6aff89c966b4ac085c301e32..3c656989389ddf52d9e562c9186677cbe4f323a7 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -21,29 +21,42 @@ #include "tfile.h" #include "walInt.h" +static void walFtruncate(SWal *pWal, int64_t ver); + int32_t walCommit(SWal *pWal, int64_t ver) { + ASSERT(pWal->snapshotVersion <= pWal->commitVersion); + ASSERT(pWal->commitVersion <= pWal->lastVersion); + ASSERT(ver >= pWal->commitVersion); + ASSERT(ver <= pWal->lastVersion); + pWal->commitVersion = ver; return 0; } int32_t walRollback(SWal *pWal, int64_t ver) { //TODO: ftruncate + ASSERT(ver > pWal->commitVersion); + ASSERT(ver <= pWal->lastVersion); + //seek position + walSeekVer(pWal, ver); + walFtruncate(pWal, ver); return 0; } int32_t walTakeSnapshot(SWal *pWal, int64_t ver) { pWal->snapshotVersion = ver; + WalFileInfo tmp; + tmp.firstVer = ver; //mark files safe to delete - int64_t* pRet = taosArraySearch(pWal->fileSet, &ver, compareInt64Val, TD_LE); - if(pRet != pWal->fileSet->pData) { - //delete files until less than retention size - - //find first file that exceeds retention time - - } + WalFileInfo* pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE); + //iterate files, until the searched result + //if totSize > rtSize, delete + //if createTs > retentionTs, delete + + //save snapshot ver, commit ver + + //make new array, remove files - //delete files living longer than retention limit - //remove file from fileset return 0; } @@ -138,105 +151,123 @@ void walRemoveAllOldFiles(void *handle) { } #endif -static int walRoll(SWal *pWal) { +int walRoll(SWal *pWal) { int code = 0; - code = tfClose(pWal->curIdxTfd); - if(code != 0) { - return code; + if(pWal->curIdxTfd != -1) { + code = tfClose(pWal->curIdxTfd); + if(code != 0) { + return -1; + } } - code = tfClose(pWal->curLogTfd); - if(code != 0) { - return code; + if(pWal->curLogTfd != -1) { + code = tfClose(pWal->curLogTfd); + if(code != 0) { + return -1; + } } int64_t idxTfd, logTfd; //create new file int64_t newFileFirstVersion = pWal->lastVersion + 1; char fnameStr[WAL_FILE_LEN]; - sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, newFileFirstVersion); + walBuildIdxName(pWal, newFileFirstVersion, fnameStr); idxTfd = tfOpenCreateWrite(fnameStr); - sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, newFileFirstVersion); + if(idxTfd < 0) { + ASSERT(0); + return -1; + } + walBuildLogName(pWal, newFileFirstVersion, fnameStr); logTfd = tfOpenCreateWrite(fnameStr); + if(logTfd < 0) { + ASSERT(0); + return -1; + } + code = walRollFileInfo(pWal); + if(code != 0) { + ASSERT(0); + return -1; + } - taosArrayPush(pWal->fileSet, &newFileFirstVersion); - //switch file pWal->curIdxTfd = idxTfd; pWal->curLogTfd = logTfd; //change status - pWal->curFileLastVersion = -1; - pWal->curFileFirstVersion = newFileFirstVersion; - pWal->curVersion = newFileFirstVersion; - pWal->curLogOffset = 0; pWal->curStatus = WAL_CUR_FILE_WRITABLE & WAL_CUR_POS_WRITABLE; - pWal->lastFileName = newFileFirstVersion; - pWal->lastFileWriteSize = 0; pWal->lastRollSeq = walGetSeq(); return 0; } int walChangeFileToLast(SWal *pWal) { int64_t idxTfd, logTfd; - int64_t* pRet = taosArrayGetLast(pWal->fileSet); + WalFileInfo* pRet = taosArrayGetLast(pWal->fileInfoSet); ASSERT(pRet != NULL); - int64_t fname = *pRet; + int64_t fileFirstVer = pRet->firstVer; char fnameStr[WAL_FILE_LEN]; - sprintf(fnameStr, "%"PRId64"."WAL_INDEX_SUFFIX, fname); + walBuildIdxName(pWal, fileFirstVer, fnameStr); idxTfd = tfOpenReadWrite(fnameStr); - sprintf(fnameStr, "%"PRId64"."WAL_LOG_SUFFIX, fname); + if(idxTfd < 0) { + return -1; + } + walBuildLogName(pWal, fileFirstVer, fnameStr); logTfd = tfOpenReadWrite(fnameStr); + if(logTfd < 0) { + return -1; + } //switch file pWal->curIdxTfd = idxTfd; pWal->curLogTfd = logTfd; //change status - pWal->curFileLastVersion = -1; - pWal->curFileFirstVersion = fname; - pWal->curVersion = fname; - pWal->curLogOffset = 0; + pWal->curVersion = fileFirstVer; pWal->curStatus = WAL_CUR_FILE_WRITABLE; return 0; } -int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { +static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { int code = 0; //get index file if(!tfValid(pWal->curIdxTfd)) { code = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); + + WalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, pWal->fileCursor); + wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pInfo->firstVer, strerror(errno)); + return code; } int64_t writeBuf[2] = { ver, offset }; int size = tfWrite(pWal->curIdxTfd, writeBuf, sizeof(writeBuf)); if(size != sizeof(writeBuf)) { - //TODO: + return -1; } return 0; } -int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t bodyLen) { +int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, int32_t bodyLen) { if (pWal == NULL) return -1; + int code = 0; // no wal if (pWal->level == TAOS_WAL_NOLOG) return 0; if (index == pWal->lastVersion + 1) { - int64_t passed = walGetSeq() - pWal->lastRollSeq; - if(passed > pWal->rollPeriod) { - walRoll(pWal); - } else if(pWal->lastFileWriteSize > pWal->segSize) { - walRoll(pWal); + if(taosArrayGetSize(pWal->fileInfoSet) == 0) { + code = walRoll(pWal); + ASSERT(code == 0); } else { - walChangeFileToLast(pWal); + int64_t passed = walGetSeq() - pWal->lastRollSeq; + if(pWal->rollPeriod != -1 && passed > pWal->rollPeriod) { + walRoll(pWal); + } else if(pWal->segSize != -1 && walGetLastFileSize(pWal) > pWal->segSize) { + walRoll(pWal); + } } } else { //reject skip log or rewrite log //must truncate explicitly first return -1; } - if (!tfValid(pWal->curLogTfd)) return 0; + /*if (!tfValid(pWal->curLogTfd)) return 0;*/ pWal->head.version = index; - int32_t code = 0; pWal->head.signature = WAL_SIGNATURE; pWal->head.len = bodyLen; @@ -250,19 +281,23 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, void *body, int32_t if (tfWrite(pWal->curLogTfd, &pWal->head, sizeof(SWalHead)) != sizeof(SWalHead)) { //ftruncate code = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); + wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno)); } if (tfWrite(pWal->curLogTfd, &body, bodyLen) != bodyLen) { //ftruncate code = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); + wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno)); + } + code = walWriteIndex(pWal, index, walGetCurFileOffset(pWal)); + if(code != 0) { + //TODO } - walWriteIndex(pWal, index, pWal->curLogOffset); - pWal->curLogOffset += sizeof(SWalHead) + bodyLen; //set status pWal->lastVersion = index; + walGetCurFileInfo(pWal)->lastVer = index; + walGetCurFileInfo(pWal)->fileSize += sizeof(SWalHead) + bodyLen; pthread_mutex_unlock(&pWal->mutex); @@ -273,9 +308,9 @@ void walFsync(SWal *pWal, bool forceFsync) { if (pWal == NULL || !tfValid(pWal->curLogTfd)) return; if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) { - wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->vgId, pWal->curFileFirstVersion); + wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->vgId, walGetCurFileFirstVer(pWal)); if (tfFsync(pWal->curLogTfd) < 0) { - wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->vgId, pWal->curFileFirstVersion, strerror(errno)); + wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->vgId, walGetCurFileFirstVer(pWal), strerror(errno)); } } } @@ -348,8 +383,36 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) { } #endif -static void walFtruncate(SWal *pWal, int64_t tfd, int64_t offset) { - tfFtruncate(tfd, offset); +static int walValidateOffset(SWal* pWal, int64_t ver) { + int code = 0; + SWalHead *pHead = NULL; + code = (int)walRead(pWal, &pHead, ver); + if(pHead->version != ver) { + return -1; + } + return 0; +} + +static int64_t walGetOffset(SWal* pWal, int64_t ver) { + int code = walSeekVer(pWal, ver); + if(code != 0) { + return -1; + } + + code = walValidateOffset(pWal, ver); + if(code != 0) { + return -1; + } + + return 0; +} + +static void walFtruncate(SWal *pWal, int64_t ver) { + int64_t tfd = pWal->curLogTfd; + tfFtruncate(tfd, ver); + tfFsync(tfd); + tfd = pWal->curIdxTfd; + tfFtruncate(tfd, ver * WAL_IDX_ENTRY_SIZE); tfFsync(tfd); } diff --git a/source/libs/wal/test/CMakeLists.txt b/source/libs/wal/test/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..1c0a3a162a3bd40abda62f20fc0a4ee6ccda1087 --- /dev/null +++ b/source/libs/wal/test/CMakeLists.txt @@ -0,0 +1,20 @@ +add_executable(walTest "") +target_sources(walTest + PRIVATE + "walMetaTest.cpp" +) +target_include_directories(walTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/wal" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) + +target_link_libraries(walTest + wal + gtest_main +) +enable_testing() +add_test( + NAME wal_test + COMMAND walTest +) diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..4c0533d3895cc8f3ba66e166ae2b85b3278f2599 --- /dev/null +++ b/source/libs/wal/test/walMetaTest.cpp @@ -0,0 +1,157 @@ +#include +#include +#include +#include + +#include "tfile.h" +#include "walInt.h" + +class WalCleanEnv : public ::testing::Test { + protected: + static void SetUpTestCase() { + int code = walInit(); + ASSERT(code == 0); + code = tfInit(); + ASSERT(code == 0); + } + + static void TearDownTestCase() { + walCleanUp(); + tfCleanup(); + } + + void SetUp() override { + taosRemoveDir(pathName); + SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWal)); + memset(pCfg, 0, sizeof(SWalCfg)); + pCfg->rollPeriod = -1; + pCfg->segSize = -1; + pCfg->walLevel = TAOS_WAL_FSYNC; + pWal = walOpen(pathName, pCfg); + ASSERT(pWal != NULL); + } + + void TearDown() override { + walClose(pWal); + pWal = NULL; + } + + SWal* pWal = NULL; + const char* pathName = "/tmp/wal_test"; +}; + +class WalKeepEnv : public ::testing::Test { + protected: + static void SetUpTestCase() { + int code = walInit(); + ASSERT(code == 0); + code = tfInit(); + ASSERT(code == 0); + } + + static void TearDownTestCase() { + walCleanUp(); + tfCleanup(); + } + + void SetUp() override { + SWalCfg* pCfg = (SWalCfg*)malloc(sizeof(SWal)); + memset(pCfg, 0, sizeof(SWalCfg)); + pCfg->rollPeriod = -1; + pCfg->segSize = -1; + pCfg->walLevel = TAOS_WAL_FSYNC; + pWal = walOpen(pathName, pCfg); + ASSERT(pWal != NULL); + } + + void TearDown() override { + walClose(pWal); + pWal = NULL; + } + + SWal* pWal = NULL; + const char* pathName = "/tmp/wal_test"; +}; + +TEST_F(WalCleanEnv, createNew) { + walRollFileInfo(pWal); + ASSERT(pWal->fileInfoSet != NULL); + ASSERT_EQ(pWal->fileInfoSet->size, 1); + WalFileInfo* pInfo = (WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet); + ASSERT_EQ(pInfo->firstVer, 0); + ASSERT_EQ(pInfo->lastVer, -1); + ASSERT_EQ(pInfo->closeTs, -1); + ASSERT_EQ(pInfo->fileSize, 0); +} + +TEST_F(WalCleanEnv, serialize) { + int code = walRollFileInfo(pWal); + ASSERT(code == 0); + ASSERT(pWal->fileInfoSet != NULL); + + code = walRollFileInfo(pWal); + ASSERT(code == 0); + code = walRollFileInfo(pWal); + ASSERT(code == 0); + code = walRollFileInfo(pWal); + ASSERT(code == 0); + code = walRollFileInfo(pWal); + ASSERT(code == 0); + code = walRollFileInfo(pWal); + ASSERT(code == 0); + char*ss = walFileInfoSerialize(pWal); + printf("%s\n", ss); + code = walWriteMeta(pWal); + ASSERT(code == 0); +} + +TEST_F(WalCleanEnv, removeOldMeta) { + int code = walRollFileInfo(pWal); + ASSERT(code == 0); + ASSERT(pWal->fileInfoSet != NULL); + code = walWriteMeta(pWal); + ASSERT(code == 0); + code = walRollFileInfo(pWal); + ASSERT(code == 0); + code = walWriteMeta(pWal); + ASSERT(code == 0); +} + +TEST_F(WalKeepEnv, readOldMeta) { + int code = walRollFileInfo(pWal); + ASSERT(code == 0); + ASSERT(pWal->fileInfoSet != NULL); + code = walWriteMeta(pWal); + ASSERT(code == 0); + code = walRollFileInfo(pWal); + ASSERT(code == 0); + code = walWriteMeta(pWal); + ASSERT(code == 0); + char*oldss = walFileInfoSerialize(pWal); + + TearDown(); + SetUp(); + code = walReadMeta(pWal); + ASSERT(code == 0); + char* newss = walFileInfoSerialize(pWal); + + int len = strlen(oldss); + ASSERT_EQ(len, strlen(newss)); + for(int i = 0; i < len; i++) { + EXPECT_EQ(oldss[i], newss[i]); + } +} + +TEST_F(WalKeepEnv, write) { + const char* ranStr = "tvapq02tcp"; + const int len = strlen(ranStr); + int code; + for(int i = 0; i < 10; i++) { + code = walWrite(pWal, i, i+1, (void*)ranStr, len); + ASSERT_EQ(code, 0); + code = walWrite(pWal, i+2, i, (void*)ranStr, len); + ASSERT_EQ(code, -1); + } + code = walWriteMeta(pWal); + ASSERT_EQ(code, 0); +} diff --git a/source/libs/wal/test/walTests.cpp b/source/libs/wal/test/walTests.cpp deleted file mode 100644 index 505728fbe4c4a6fbc126aa18ff6db93a28388173..0000000000000000000000000000000000000000 --- a/source/libs/wal/test/walTests.cpp +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -//#define _DEFAULT_SOURCE -#include "os.h" -#include "tutil.h" -#include "tglobal.h" -#include "tlog.h" -#include "twal.h" -#include "tfile.h" - -int64_t ver = 0; -void *pWal = NULL; - -int writeToQueue(void *pVnode, void *data, int type, void *pMsg) { - // do nothing - SWalHead *pHead = data; - - if (pHead->version > ver) - ver = pHead->version; - - walWrite(pWal, pHead); - - return 0; -} - -int main(int argc, char *argv[]) { - char path[128] = "/tmp/wal"; - int level = 2; - int total = 5; - int rows = 10000; - int size = 128; - int keep = 0; - - for (int i=1; iversion = ++ver; - pHead->len = size; - walWrite(pWal, pHead); - } - - printf("renew a wal, i:%d\n", i); - walRenew(pWal); - } - - printf("%d wal files are written\n", total); - - int64_t index = 0; - char name[256]; - - while (1) { - int code = walGetWalFile(pWal, name, &index); - if (code == -1) { - printf("failed to get wal file, index:%" PRId64 "\n", index); - break; - } - - printf("index:%" PRId64 " wal:%s\n", index, name); - if (code == 0) break; - } - - getchar(); - - walClose(pWal); - walCleanUp(); - tfCleanup(); - - return 0; -}