diff --git a/include/dnode/vnode/tq/tq.h b/include/dnode/vnode/tq/tq.h index 85533f65bc25154ead1c8771f7f06c31fad0e3ce..7359df92cc680c7c052131e843ed158dcf4ff8f4 100644 --- a/include/dnode/vnode/tq/tq.h +++ b/include/dnode/vnode/tq/tq.h @@ -256,7 +256,7 @@ typedef struct STQ { // the collection of group handle // the handle of kvstore char* path; - STqCfg* tqConfig; + STqCfg* tqConfig; TqLogReader* tqLogReader; TqMemRef tqMemRef; TqMetaStore* tqMeta; diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 48c775292a1e9f9fa3c5b828169389351df99a4a..ae1e630c6fff6caa46e897dec51207fe4ce3386c 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -44,7 +44,7 @@ typedef struct SWalReadHead { int8_t reserved[2]; int32_t len; int64_t version; - char cont[]; + char body[]; } SWalReadHead; typedef struct { @@ -52,9 +52,9 @@ typedef struct { int32_t fsyncPeriod; // millisecond int32_t retentionPeriod; // secs int32_t rollPeriod; // secs - int32_t retentionSize; // secs + int64_t retentionSize; int64_t segSize; - EWalType walLevel; // wal level + EWalType level; // wal level } SWalCfg; typedef struct { @@ -90,15 +90,17 @@ typedef struct { #define WAL_CUR_FILE_WRITABLE 2 #define WAL_CUR_FAILED 4 +typedef struct SWalVer { + int64_t firstVer; + int64_t verInSnapshotting; + int64_t snapshotVer; + int64_t commitVer; + int64_t lastVer; +} SWalVer; + typedef struct SWal { // cfg - int32_t vgId; - int32_t fsyncPeriod; // millisecond - int32_t rollPeriod; // second - int64_t segSize; - int64_t retentionSize; - int32_t retentionPeriod; - EWalType level; + SWalCfg cfg; //total size int64_t totSize; //fsync seq @@ -109,12 +111,7 @@ typedef struct SWal { int64_t writeLogTfd; int64_t writeIdxTfd; //wal lifecycle - int64_t firstVersion; - int64_t snapshotVersion; - int64_t commitVersion; - int64_t lastVersion; - //snapshotting version - int64_t snapshottingVer; + SWalVer vers; //roll status int64_t lastRollSeq; //file set @@ -126,9 +123,20 @@ typedef struct SWal { //path char path[WAL_PATH_LEN]; //reusable write head - SWalHead head; + SWalHead writeHead; } SWal; // WAL HANDLE +typedef struct SWalReadHandle { + SWal* pWal; + int64_t readLogTfd; + int64_t readIdxTfd; + int64_t curFileFirstVer; + int64_t curVersion; + int64_t capacity; + int64_t status; //if cursor valid + SWalHead head; +} SWalReadHandle; + typedef int32_t (*FWalWrite)(void *ahandle, void *pHead); // module initialization @@ -154,6 +162,10 @@ int32_t walEndTakeSnapshot(SWal *); //int32_t walDataCorrupted(SWal*); // read +SWalReadHandle* walOpenReadHandle(SWal *); +void walCloseReadHandle(SWalReadHandle *); +int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver); + int32_t walRead(SWal *, SWalHead **, int64_t ver); int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum); diff --git a/source/libs/wal/inc/walInt.h b/source/libs/wal/inc/walInt.h index f8f2e2eadd35fb67cb555c4e60e6eecdc6cefce9..ec01f7d7fc91e7689dffbf53d266f7e4f652ee29 100644 --- a/source/libs/wal/inc/walInt.h +++ b/source/libs/wal/inc/walInt.h @@ -90,7 +90,7 @@ static inline int walValidHeadCksum(SWalHead* pHead) { } static inline int walValidBodyCksum(SWalHead* pHead) { - return taosCheckChecksum((uint8_t*)pHead->head.cont, pHead->head.len, pHead->cksumBody); + return taosCheckChecksum((uint8_t*)pHead->head.body, pHead->head.len, pHead->cksumBody); } static inline int walValidCksum(SWalHead *pHead, void* body, int64_t bodyLen) { diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 705cf051be8055b902ed6c3899491b7c99d55212..49f4fde3a0331ea19b10599b1fd52a2429d979ac 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -25,15 +25,15 @@ #include int64_t walGetFirstVer(SWal *pWal) { - return pWal->firstVersion; + return pWal->vers.firstVer; } int64_t walGetSnaphostVer(SWal *pWal) { - return pWal->snapshotVersion; + return pWal->vers.snapshotVer; } int64_t walGetLastVer(SWal *pWal) { - return pWal->lastVersion; + return pWal->vers.lastVer; } int walRollFileInfo(SWal* pWal) { @@ -42,7 +42,7 @@ int walRollFileInfo(SWal* pWal) { SArray* pArray = pWal->fileInfoSet; if(taosArrayGetSize(pArray) != 0) { WalFileInfo *pInfo = taosArrayGetLast(pArray); - pInfo->lastVer = pWal->lastVersion; + pInfo->lastVer = pWal->vers.lastVer; pInfo->closeTs = ts; } @@ -51,7 +51,7 @@ int walRollFileInfo(SWal* pWal) { if(pNewInfo == NULL) { return -1; } - pNewInfo->firstVer = pWal->lastVersion + 1; + pNewInfo->firstVer = pWal->vers.lastVer + 1; pNewInfo->lastVer = -1; pNewInfo->createTs = ts; pNewInfo->closeTs = -1; @@ -74,13 +74,13 @@ char* walMetaSerialize(SWal* pWal) { return NULL; } cJSON_AddItemToObject(pRoot, "meta", pMeta); - sprintf(buf, "%" PRId64, pWal->firstVersion); + sprintf(buf, "%" PRId64, pWal->vers.firstVer); cJSON_AddStringToObject(pMeta, "firstVer", buf); - sprintf(buf, "%" PRId64, pWal->snapshotVersion); + sprintf(buf, "%" PRId64, pWal->vers.snapshotVer); cJSON_AddStringToObject(pMeta, "snapshotVer", buf); - sprintf(buf, "%" PRId64, pWal->commitVersion); + sprintf(buf, "%" PRId64, pWal->vers.commitVer); cJSON_AddStringToObject(pMeta, "commitVer", buf); - sprintf(buf, "%" PRId64, pWal->lastVersion); + sprintf(buf, "%" PRId64, pWal->vers.lastVer); cJSON_AddStringToObject(pMeta, "lastVer", buf); cJSON_AddItemToObject(pRoot, "files", pFiles); @@ -116,13 +116,13 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) { pRoot = cJSON_Parse(bytes); pMeta = cJSON_GetObjectItem(pRoot, "meta"); pField = cJSON_GetObjectItem(pMeta, "firstVer"); - pWal->firstVersion = atoll(cJSON_GetStringValue(pField)); + pWal->vers.firstVer = atoll(cJSON_GetStringValue(pField)); pField = cJSON_GetObjectItem(pMeta, "snapshotVer"); - pWal->snapshotVersion = atoll(cJSON_GetStringValue(pField)); + pWal->vers.snapshotVer = atoll(cJSON_GetStringValue(pField)); pField = cJSON_GetObjectItem(pMeta, "commitVer"); - pWal->commitVersion = atoll(cJSON_GetStringValue(pField)); + pWal->vers.commitVer = atoll(cJSON_GetStringValue(pField)); pField = cJSON_GetObjectItem(pMeta, "lastVer"); - pWal->lastVersion = atoll(cJSON_GetStringValue(pField)); + pWal->vers.lastVer = atoll(cJSON_GetStringValue(pField)); pFiles = cJSON_GetObjectItem(pRoot, "files"); int sz = cJSON_GetArraySize(pFiles); @@ -161,7 +161,7 @@ static int walFindCurMetaVer(SWal* pWal) { DIR *dir = opendir(pWal->path); if(dir == NULL) { - wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); + wError("vgId:%d, path:%s, failed to open since %s", pWal->cfg.vgId, pWal->path, strerror(errno)); return -1; } diff --git a/source/libs/wal/src/walMgmt.c b/source/libs/wal/src/walMgmt.c index a47f27f1425efbb9b7f52ecab69852839534caa5..31f2ef037aed0b2bb90248b932ba687a2f7c4bd3 100644 --- a/source/libs/wal/src/walMgmt.c +++ b/source/libs/wal/src/walMgmt.c @@ -86,21 +86,15 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { pWal->writeCur = -1; //set config - pWal->vgId = pCfg->vgId; - pWal->fsyncPeriod = pCfg->fsyncPeriod; - pWal->rollPeriod = pCfg->rollPeriod; - pWal->segSize = pCfg->segSize; - pWal->retentionSize = pCfg->retentionSize; - pWal->retentionPeriod = pCfg->retentionPeriod; - pWal->level = pCfg->walLevel; + memcpy(&pWal->cfg, pCfg, sizeof(SWalCfg)); //init version info - pWal->firstVersion = -1; - pWal->commitVersion = -1; - pWal->snapshotVersion = -1; - pWal->lastVersion = -1; + pWal->vers.firstVer = -1; + pWal->vers.commitVer = -1; + pWal->vers.snapshotVer = -1; + pWal->vers.lastVer = -1; - pWal->snapshottingVer = -1; + pWal->vers.verInSnapshotting = -1; pWal->totSize = 0; @@ -108,8 +102,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { pWal->lastRollSeq = -1; //init write buffer - memset(&pWal->head, 0, sizeof(SWalHead)); - pWal->head.head.sver = 0; + memset(&pWal->writeHead, 0, sizeof(SWalHead)); + pWal->writeHead.head.sver = 0; tstrncpy(pWal->path, path, sizeof(pWal->path)); pthread_mutex_init(&pWal->mutex, NULL); @@ -129,7 +123,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { } walReadMeta(pWal); - wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod); + wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, pWal->cfg.fsyncPeriod); return pWal; } @@ -137,17 +131,17 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { if (pWal == NULL) return TSDB_CODE_WAL_APP_ERROR; - if (pWal->level == pCfg->walLevel && pWal->fsyncPeriod == pCfg->fsyncPeriod) { - wDebug("vgId:%d, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->vgId, pWal->level, - pWal->fsyncPeriod, pCfg->walLevel, pCfg->fsyncPeriod); + if (pWal->cfg.level == pCfg->level && pWal->cfg.fsyncPeriod == pCfg->fsyncPeriod) { + wDebug("vgId:%d, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->cfg.vgId, pWal->cfg.level, + pWal->cfg.fsyncPeriod, pCfg->level, pCfg->fsyncPeriod); return 0; } - wInfo("vgId:%d, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d", pWal->vgId, pWal->level, - pWal->fsyncPeriod, pCfg->walLevel, pCfg->fsyncPeriod); + wInfo("vgId:%d, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d", pWal->cfg.vgId, pWal->cfg.level, + pWal->cfg.fsyncPeriod, pCfg->level, pCfg->fsyncPeriod); - pWal->level = pCfg->walLevel; - pWal->fsyncPeriod = pCfg->fsyncPeriod; + pWal->cfg.level = pCfg->level; + pWal->cfg.fsyncPeriod = pCfg->fsyncPeriod; pWal->fsyncSeq = pCfg->fsyncPeriod / 1000; if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1; @@ -171,22 +165,22 @@ void walClose(SWal *pWal) { static int32_t walInitObj(SWal *pWal) { if (taosMkDir(pWal->path) != 0) { - wError("vgId:%d, path:%s, failed to create directory since %s", pWal->vgId, pWal->path, strerror(errno)); + wError("vgId:%d, path:%s, failed to create directory since %s", pWal->cfg.vgId, pWal->path, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } pWal->fileInfoSet = taosArrayInit(8, sizeof(WalFileInfo)); if(pWal->fileInfoSet == NULL) { - wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->vgId, pWal->path, strerror(errno)); + wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } - wDebug("vgId:%d, object is initialized", pWal->vgId); + wDebug("vgId:%d, object is initialized", pWal->cfg.vgId); return 0; } static void walFreeObj(void *wal) { SWal *pWal = wal; - wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal); + wDebug("vgId:%d, wal:%p is freed", pWal->cfg.vgId, pWal); tfClose(pWal->writeLogTfd); tfClose(pWal->writeIdxTfd); @@ -197,7 +191,7 @@ static void walFreeObj(void *wal) { } static bool walNeedFsync(SWal *pWal) { - if (pWal->fsyncPeriod <= 0 || pWal->level != TAOS_WAL_FSYNC) { + if (pWal->cfg.fsyncPeriod <= 0 || pWal->cfg.level != TAOS_WAL_FSYNC) { return false; } @@ -217,10 +211,10 @@ static void walFsyncAll() { SWal *pWal = taosIterateRef(tsWal.refSetId, 0); while (pWal) { if (walNeedFsync(pWal)) { - wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq)); + wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->cfg.vgId, pWal->cfg.level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq)); int32_t code = tfFsync(pWal->writeLogTfd); if (code != 0) { - wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(code)); + wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(code)); } } pWal = taosIterateRef(tsWal.refSetId, pWal->refId); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 27dffddf83e129888264a8347a108898308aa40a..554a5c846b5df84e76d192bb588a51e7e2020ca5 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -16,6 +16,147 @@ #include "walInt.h" #include "tfile.h" +SWalReadHandle* walOpenReadHandle(SWal* pWal) { + SWalReadHandle *pRead = malloc(sizeof(SWalReadHandle)); + if(pRead == NULL) { + return NULL; + } + memset(pRead, 0, sizeof(SWalReadHandle)); + pRead->pWal = pWal; + pRead->readIdxTfd = -1; + pRead->readLogTfd = -1; + return NULL; +} + +void walCloseReadHandle(SWalReadHandle *pRead) { + tfClose(pRead->readIdxTfd); + tfClose(pRead->readLogTfd); + free(pRead); +} + +int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) { + return 0; +} + +static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) { + int code = 0; + + int64_t idxTfd = pRead->readIdxTfd; + int64_t logTfd = pRead->readLogTfd; + + //seek position + int64_t offset = (ver - fileFirstVer) * WAL_IDX_ENTRY_SIZE; + code = tfLseek(idxTfd, offset, SEEK_SET); + if(code != 0) { + return -1; + } + WalIdxEntry entry; + code = tfRead(idxTfd, &entry, sizeof(WalIdxEntry)); + if(code != 0) { + return -1; + } + //TODO:deserialize + ASSERT(entry.ver == ver); + code = tfLseek(logTfd, entry.offset, SEEK_SET); + if (code != 0) { + return -1; + } + return code; +} + +static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) { + char fnameStr[WAL_FILE_LEN]; + + tfClose(pRead->readIdxTfd); + tfClose(pRead->readLogTfd); + + walBuildLogName(pRead->pWal, fileFirstVer, fnameStr); + int logTfd = tfOpenRead(fnameStr); + if(logTfd < 0) { + return -1; + } + + walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr); + int idxTfd = tfOpenRead(fnameStr); + if(idxTfd < 0) { + return -1; + } + + pRead->readLogTfd = logTfd; + pRead->readIdxTfd = idxTfd; + return 0; +} + +static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) { + int code; + SWal *pWal = pRead->pWal; + if(ver == pWal->vers.lastVer) { + return 0; + } + if(ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) { + return -1; + } + if(ver < pWal->vers.snapshotVer) { + + } + + WalFileInfo tmpInfo; + tmpInfo.firstVer = ver; + //bsearch in fileSet + WalFileInfo* pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); + ASSERT(pRet != NULL); + if(pRead->curFileFirstVer != pRet->firstVer) { + code = walReadChangeFile(pRead, pRet->firstVer); + if(code < 0) { + //TODO: set error flag + return -1; + } + } + + code = walReadSeekFilePos(pRead, pRet->firstVer, ver); + if(code < 0) { + return -1; + } + pRead->curVersion = ver; + + return 0; +} + +int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { + int code; + //TODO: check wal life + if(pRead->curVersion != ver) { + walReadSeekVer(pRead, ver); + } + + if(!tfValid(pRead->readLogTfd)) return -1; + + if(sizeof(SWalHead) != tfRead(pRead->readLogTfd, &pRead->head, sizeof(SWalHead))) { + return -1; + } + code = walValidHeadCksum(&pRead->head); + if(code != 0) { + return -1; + } + if(pRead->capacity < pRead->head.head.len) { + void* ptr = realloc(pRead, pRead->head.head.len); + if(ptr == NULL) { + return -1; + } + pRead = ptr; + pRead->capacity = pRead->head.head.len; + } + if(pRead->head.head.len != tfRead(pRead->readLogTfd, &pRead->head.head.body, pRead->head.head.len)) { + return -1; + } + code = walValidBodyCksum(&pRead->head); + if(code != 0) { + return -1; + } + + return 0; +} + int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { int code; code = walSeekVer(pWal, ver); @@ -42,7 +183,7 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { *ppHead = NULL; return -1; } - if(tfRead(pWal->writeLogTfd, (*ppHead)->head.cont, (*ppHead)->head.len) != (*ppHead)->head.len) { + if(tfRead(pWal->writeLogTfd, (*ppHead)->head.body, (*ppHead)->head.len) != (*ppHead)->head.len) { return -1; } //TODO: endian compatibility processing after read diff --git a/source/libs/wal/src/walSeek.c b/source/libs/wal/src/walSeek.c index 48272f8f327f26cd3a30147eff8c8328830ef3f3..953aae703c75764acf047851022b15c79b88cc77 100644 --- a/source/libs/wal/src/walSeek.c +++ b/source/libs/wal/src/walSeek.c @@ -78,10 +78,12 @@ int walChangeFile(SWal *pWal, int64_t ver) { code = tfClose(pWal->writeLogTfd); if(code != 0) { //TODO + return -1; } code = tfClose(pWal->writeIdxTfd); if(code != 0) { //TODO + return -1; } WalFileInfo tmpInfo; tmpInfo.firstVer = ver; @@ -106,24 +108,19 @@ int walChangeFile(SWal *pWal, int64_t ver) { pWal->writeLogTfd = logTfd; pWal->writeIdxTfd = idxTfd; - return code; -} - -int walGetVerOffset(SWal* pWal, int64_t ver) { - int code; - return 0; + return fileFirstVer; } int walSeekVer(SWal *pWal, int64_t ver) { int code; - if(ver == pWal->lastVersion) { + if(ver == pWal->vers.lastVer) { return 0; } - if(ver > pWal->lastVersion || ver < pWal->firstVersion) { + if(ver > pWal->vers.lastVer|| ver < pWal->vers.firstVer) { return -1; } - if(ver < pWal->snapshotVersion) { - //TODO: set flag to prevent roll back + if(ver < pWal->vers.snapshotVer) { + } if(ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) { code = walChangeFile(pWal, ver); diff --git a/source/libs/wal/src/walUtil.c b/source/libs/wal/src/walUtil.c index c88cc918fe42c30564abe3feb5ae273386d1b994..849d0c3e5189acdd6fdd50a6ef3210351ffbb64d 100644 --- a/source/libs/wal/src/walUtil.c +++ b/source/libs/wal/src/walUtil.c @@ -17,6 +17,7 @@ #include "os.h" #include "walInt.h" +#if 0 int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId) { int64_t curFileId = *nextFileId; int64_t minFileId = INT64_MAX; @@ -116,3 +117,4 @@ int32_t walGetNewFile(SWal *pWal, int64_t *newFileId) { return 0; } +#endif diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 972ac5c68240cd9d5739c7f82e2e4fe154a54934..44e8cec153f7791e7741244c8f1c5d804ef72e24 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -114,22 +114,22 @@ void walRemoveAllOldFiles(void *handle) { #endif int32_t walCommit(SWal *pWal, int64_t ver) { - ASSERT(pWal->commitVersion >= pWal->snapshotVersion); - ASSERT(pWal->commitVersion <= pWal->lastVersion); - if(ver < pWal->commitVersion || ver > pWal->lastVersion) { + ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer); + ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer); + if(ver < pWal->vers.commitVer || ver > pWal->vers.lastVer) { return -1; } - pWal->commitVersion = ver; + pWal->vers.commitVer = ver; return 0; } int32_t walRollback(SWal *pWal, int64_t ver) { int code; char fnameStr[WAL_FILE_LEN]; - if(ver == pWal->lastVersion) { + if(ver == pWal->vers.lastVer) { return 0; } - if(ver > pWal->lastVersion || ver < pWal->commitVersion) { + if(ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) { return -1; } pthread_mutex_lock(&pWal->mutex); @@ -220,7 +220,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) { if(code < 0) { return -1; } - pWal->lastVersion = ver - 1; + pWal->vers.lastVer = ver - 1; ((WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1; ((WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset; @@ -230,9 +230,9 @@ int32_t walRollback(SWal *pWal, int64_t ver) { } int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) { - pWal->snapshottingVer = ver; + pWal->vers.verInSnapshotting = ver; //check file rolling - if(pWal->retentionPeriod == 0) { + if(pWal->cfg.retentionPeriod == 0) { walRoll(pWal); } @@ -240,10 +240,10 @@ int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) { } int32_t walEndTakeSnapshot(SWal *pWal) { - int64_t ver = pWal->snapshottingVer; + int64_t ver = pWal->vers.verInSnapshotting; if(ver == -1) return -1; - pWal->snapshotVersion = ver; + pWal->vers.snapshotVer = ver; int ts = taosGetTimestampSec(); int deleteCnt = 0; @@ -257,8 +257,8 @@ int32_t walEndTakeSnapshot(SWal *pWal) { } //iterate files, until the searched result for(WalFileInfo* iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { - if(pWal->totSize > pWal->retentionSize || - iter->closeTs + pWal->retentionPeriod > ts) { + if(pWal->totSize > pWal->cfg.retentionSize || + iter->closeTs + pWal->cfg.retentionPeriod > ts) { //delete according to file size or close time deleteCnt++; newTotSize -= iter->fileSize; @@ -278,13 +278,13 @@ int32_t walEndTakeSnapshot(SWal *pWal) { taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt); if(taosArrayGetSize(pWal->fileInfoSet) == 0) { pWal->writeCur = -1; - pWal->firstVersion = -1; + pWal->vers.firstVer = -1; } else { - pWal->firstVersion = ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; + pWal->vers.firstVer = ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; } pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;; pWal->totSize = newTotSize; - pWal->snapshottingVer = -1; + pWal->vers.verInSnapshotting = -1; //save snapshot ver, commit ver int code = walWriteMeta(pWal); @@ -311,7 +311,7 @@ int walRoll(SWal *pWal) { } int64_t idxTfd, logTfd; //create new file - int64_t newFileFirstVersion = pWal->lastVersion + 1; + int64_t newFileFirstVersion = pWal->vers.lastVer + 1; char fnameStr[WAL_FILE_LEN]; walBuildIdxName(pWal, newFileFirstVersion, fnameStr); idxTfd = tfOpenCreateWrite(fnameStr); @@ -357,18 +357,18 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i int code = 0; // no wal - if (pWal->level == TAOS_WAL_NOLOG) return 0; + if (pWal->cfg.level == TAOS_WAL_NOLOG) return 0; - if (index == pWal->lastVersion + 1) { + if (index == pWal->vers.lastVer + 1) { if(taosArrayGetSize(pWal->fileInfoSet) == 0) { - pWal->firstVersion = index; + pWal->vers.firstVer = index; code = walRoll(pWal); ASSERT(code == 0); } else { int64_t passed = walGetSeq() - pWal->lastRollSeq; - if(pWal->rollPeriod != -1 && pWal->rollPeriod != 0 && passed > pWal->rollPeriod) { + if(pWal->cfg.rollPeriod != -1 && pWal->cfg.rollPeriod != 0 && passed > pWal->cfg.rollPeriod) { walRoll(pWal); - } else if(pWal->segSize != -1 && pWal->segSize != 0 && walGetLastFileSize(pWal) > pWal->segSize) { + } else if(pWal->cfg.segSize != -1 && pWal->cfg.segSize != 0 && walGetLastFileSize(pWal) > pWal->cfg.segSize) { walRoll(pWal); } } @@ -380,23 +380,23 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i /*if (!tfValid(pWal->curLogTfd)) return 0;*/ pthread_mutex_lock(&pWal->mutex); - pWal->head.head.version = index; + pWal->writeHead.head.version = index; - pWal->head.head.len = bodyLen; - pWal->head.head.msgType = msgType; - pWal->head.cksumHead = walCalcHeadCksum(&pWal->head); - pWal->head.cksumBody = walCalcBodyCksum(body, bodyLen); + pWal->writeHead.head.len = bodyLen; + pWal->writeHead.head.msgType = msgType; + pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead); + pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen); - if (tfWrite(pWal->writeLogTfd, &pWal->head, sizeof(SWalHead)) != sizeof(SWalHead)) { + if (tfWrite(pWal->writeLogTfd, &pWal->writeHead, sizeof(SWalHead)) != sizeof(SWalHead)) { //ftruncate code = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno)); + wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno)); } if (tfWrite(pWal->writeLogTfd, &body, bodyLen) != bodyLen) { //ftruncate code = TAOS_SYSTEM_ERROR(errno); - wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno)); + wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno)); } code = walWriteIndex(pWal, index, walGetCurFileOffset(pWal)); if(code != 0) { @@ -405,7 +405,7 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i } //set status - pWal->lastVersion = index; + pWal->vers.lastVer = index; pWal->totSize += sizeof(SWalHead) + bodyLen; walGetCurFileInfo(pWal)->lastVer = index; walGetCurFileInfo(pWal)->fileSize += sizeof(SWalHead) + bodyLen; @@ -416,10 +416,10 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i } void walFsync(SWal *pWal, bool forceFsync) { - if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) { - wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->vgId, walGetCurFileFirstVer(pWal)); + if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) { + wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal)); if (tfFsync(pWal->writeLogTfd) < 0) { - wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->vgId, walGetCurFileFirstVer(pWal), strerror(errno)); + wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal), strerror(errno)); } } } @@ -492,29 +492,29 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) { } #endif -static int walValidateOffset(SWal* pWal, int64_t ver) { - int code = 0; - SWalHead *pHead = NULL; - code = (int)walRead(pWal, &pHead, ver); - if(pHead->head.version != ver) { - return -1; - } - return 0; -} +/*static int walValidateOffset(SWal* pWal, int64_t ver) {*/ + /*int code = 0;*/ + /*SWalHead *pHead = NULL;*/ + /*code = (int)walRead(pWal, &pHead, ver);*/ + /*if(pHead->head.version != ver) {*/ + /*return -1;*/ + /*}*/ + /*return 0;*/ +/*}*/ -static int64_t walGetOffset(SWal* pWal, int64_t ver) { - int code = walSeekVer(pWal, ver); - if(code != 0) { - return -1; - } +/*static int64_t walGetOffset(SWal* pWal, int64_t ver) {*/ + /*int code = walSeekVer(pWal, ver);*/ + /*if(code != 0) {*/ + /*return -1;*/ + /*}*/ - code = walValidateOffset(pWal, ver); - if(code != 0) { - return -1; - } + /*code = walValidateOffset(pWal, ver);*/ + /*if(code != 0) {*/ + /*return -1;*/ + /*}*/ - return 0; -} + /*return 0;*/ +/*}*/ #if 0 static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) { diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index c81343d03b85d27e1c57444db23fbba5e9402300..504f1ada3f7d1b66135879538a173caef43e1f2a 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -24,7 +24,7 @@ class WalCleanEnv : public ::testing::Test { pCfg->segSize = -1; pCfg->retentionPeriod = 0; pCfg->retentionSize = 0; - pCfg->walLevel = TAOS_WAL_FSYNC; + pCfg->level = TAOS_WAL_FSYNC; pWal = walOpen(pathName, pCfg); free(pCfg); ASSERT(pWal != NULL); @@ -56,7 +56,7 @@ class WalCleanDeleteEnv : public ::testing::Test { memset(pCfg, 0, sizeof(SWalCfg)); pCfg->retentionPeriod = 0; pCfg->retentionSize = 0; - pCfg->walLevel = TAOS_WAL_FSYNC; + pCfg->level = TAOS_WAL_FSYNC; pWal = walOpen(pathName, pCfg); free(pCfg); ASSERT(pWal != NULL); @@ -95,7 +95,7 @@ class WalKeepEnv : public ::testing::Test { pCfg->segSize = -1; pCfg->retentionPeriod = 0; pCfg->retentionSize = 0; - pCfg->walLevel = TAOS_WAL_FSYNC; + pCfg->level = TAOS_WAL_FSYNC; pWal = walOpen(pathName, pCfg); free(pCfg); ASSERT(pWal != NULL); @@ -164,18 +164,18 @@ TEST_F(WalKeepEnv, readOldMeta) { for(int i = 0; i < 10; i++) { code = walWrite(pWal, i, i+1, (void*)ranStr, len); ASSERT_EQ(code, 0); - ASSERT_EQ(pWal->lastVersion, i); + ASSERT_EQ(pWal->vers.lastVer, i); code = walWrite(pWal, i+2, i, (void*)ranStr, len); ASSERT_EQ(code, -1); - ASSERT_EQ(pWal->lastVersion, i); + ASSERT_EQ(pWal->vers.lastVer, i); } char* oldss = walMetaSerialize(pWal); TearDown(); SetUp(); - ASSERT_EQ(pWal->firstVersion, 0); - ASSERT_EQ(pWal->lastVersion, 9); + ASSERT_EQ(pWal->vers.firstVer, 0); + ASSERT_EQ(pWal->vers.lastVer, 9); char* newss = walMetaSerialize(pWal); @@ -195,10 +195,10 @@ TEST_F(WalCleanEnv, write) { for(int i = 0; i < 10; i++) { code = walWrite(pWal, i, i+1, (void*)ranStr, len); ASSERT_EQ(code, 0); - ASSERT_EQ(pWal->lastVersion, i); + ASSERT_EQ(pWal->vers.lastVer, i); code = walWrite(pWal, i+2, i, (void*)ranStr, len); ASSERT_EQ(code, -1); - ASSERT_EQ(pWal->lastVersion, i); + ASSERT_EQ(pWal->vers.lastVer, i); } code = walWriteMeta(pWal); ASSERT_EQ(code, 0); @@ -211,14 +211,14 @@ TEST_F(WalCleanEnv, rollback) { for(int i = 0; i < 10; i++) { code = walWrite(pWal, i, i+1, (void*)ranStr, len); ASSERT_EQ(code, 0); - ASSERT_EQ(pWal->lastVersion, i); + ASSERT_EQ(pWal->vers.lastVer, i); } code = walRollback(pWal, 5); ASSERT_EQ(code, 0); - ASSERT_EQ(pWal->lastVersion, 4); + ASSERT_EQ(pWal->vers.lastVer, 4); code = walRollback(pWal, 3); ASSERT_EQ(code, 0); - ASSERT_EQ(pWal->lastVersion, 2); + ASSERT_EQ(pWal->vers.lastVer, 2); code = walWriteMeta(pWal); ASSERT_EQ(code, 0); } @@ -231,16 +231,16 @@ TEST_F(WalCleanDeleteEnv, roll) { for(i = 0; i < 100; i++) { code = walWrite(pWal, i, 0, (void*)ranStr, len); ASSERT_EQ(code, 0); - ASSERT_EQ(pWal->lastVersion, i); + ASSERT_EQ(pWal->vers.lastVer, i); code = walCommit(pWal, i); - ASSERT_EQ(pWal->commitVersion, i); + ASSERT_EQ(pWal->vers.commitVer, i); } walBeginTakeSnapshot(pWal, i-1); - ASSERT_EQ(pWal->snapshottingVer, i-1); + ASSERT_EQ(pWal->vers.verInSnapshotting, i-1); walEndTakeSnapshot(pWal); - ASSERT_EQ(pWal->snapshotVersion, i-1); - ASSERT_EQ(pWal->snapshottingVer, -1); + ASSERT_EQ(pWal->vers.snapshotVer, i-1); + ASSERT_EQ(pWal->vers.verInSnapshotting, -1); code = walWrite(pWal, 5, 0, (void*)ranStr, len); ASSERT_NE(code, 0); @@ -249,7 +249,7 @@ TEST_F(WalCleanDeleteEnv, roll) { code = walWrite(pWal, i, 0, (void*)ranStr, len); ASSERT_EQ(code, 0); code = walCommit(pWal, i); - ASSERT_EQ(pWal->commitVersion, i); + ASSERT_EQ(pWal->vers.commitVer, i); } //code = walWriteMeta(pWal); diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 6216188496be91d5a2d7e603a854118b1dc60f8f..581a7973432edaf20f8ff986816abd40bd12d816 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -250,7 +250,7 @@ void taosArrayPopFrontBatch(SArray* pArray, size_t cnt) { if(pArray->size == 0) { return; } - memmove(pArray->pData, (char*)pArray->pData + cnt * pArray->elemSize, pArray->size); + memmove(pArray->pData, (char*)pArray->pData + cnt * pArray->elemSize, pArray->size * pArray->elemSize); } void taosArrayPopTailBatch(SArray* pArray, size_t cnt) {