提交 dc245929 编写于 作者: L Liu Jicong

add wal read handle

上级 9569172a
...@@ -256,7 +256,7 @@ typedef struct STQ { ...@@ -256,7 +256,7 @@ typedef struct STQ {
// the collection of group handle // the collection of group handle
// the handle of kvstore // the handle of kvstore
char* path; char* path;
STqCfg* tqConfig; STqCfg* tqConfig;
TqLogReader* tqLogReader; TqLogReader* tqLogReader;
TqMemRef tqMemRef; TqMemRef tqMemRef;
TqMetaStore* tqMeta; TqMetaStore* tqMeta;
......
...@@ -44,7 +44,7 @@ typedef struct SWalReadHead { ...@@ -44,7 +44,7 @@ typedef struct SWalReadHead {
int8_t reserved[2]; int8_t reserved[2];
int32_t len; int32_t len;
int64_t version; int64_t version;
char cont[]; char body[];
} SWalReadHead; } SWalReadHead;
typedef struct { typedef struct {
...@@ -52,9 +52,9 @@ typedef struct { ...@@ -52,9 +52,9 @@ typedef struct {
int32_t fsyncPeriod; // millisecond int32_t fsyncPeriod; // millisecond
int32_t retentionPeriod; // secs int32_t retentionPeriod; // secs
int32_t rollPeriod; // secs int32_t rollPeriod; // secs
int32_t retentionSize; // secs int64_t retentionSize;
int64_t segSize; int64_t segSize;
EWalType walLevel; // wal level EWalType level; // wal level
} SWalCfg; } SWalCfg;
typedef struct { typedef struct {
...@@ -90,15 +90,17 @@ typedef struct { ...@@ -90,15 +90,17 @@ typedef struct {
#define WAL_CUR_FILE_WRITABLE 2 #define WAL_CUR_FILE_WRITABLE 2
#define WAL_CUR_FAILED 4 #define WAL_CUR_FAILED 4
typedef struct SWalVer {
int64_t firstVer;
int64_t verInSnapshotting;
int64_t snapshotVer;
int64_t commitVer;
int64_t lastVer;
} SWalVer;
typedef struct SWal { typedef struct SWal {
// cfg // cfg
int32_t vgId; SWalCfg cfg;
int32_t fsyncPeriod; // millisecond
int32_t rollPeriod; // second
int64_t segSize;
int64_t retentionSize;
int32_t retentionPeriod;
EWalType level;
//total size //total size
int64_t totSize; int64_t totSize;
//fsync seq //fsync seq
...@@ -109,12 +111,7 @@ typedef struct SWal { ...@@ -109,12 +111,7 @@ typedef struct SWal {
int64_t writeLogTfd; int64_t writeLogTfd;
int64_t writeIdxTfd; int64_t writeIdxTfd;
//wal lifecycle //wal lifecycle
int64_t firstVersion; SWalVer vers;
int64_t snapshotVersion;
int64_t commitVersion;
int64_t lastVersion;
//snapshotting version
int64_t snapshottingVer;
//roll status //roll status
int64_t lastRollSeq; int64_t lastRollSeq;
//file set //file set
...@@ -126,9 +123,20 @@ typedef struct SWal { ...@@ -126,9 +123,20 @@ typedef struct SWal {
//path //path
char path[WAL_PATH_LEN]; char path[WAL_PATH_LEN];
//reusable write head //reusable write head
SWalHead head; SWalHead writeHead;
} SWal; // WAL HANDLE } SWal; // WAL HANDLE
typedef struct SWalReadHandle {
SWal* pWal;
int64_t readLogTfd;
int64_t readIdxTfd;
int64_t curFileFirstVer;
int64_t curVersion;
int64_t capacity;
int64_t status; //if cursor valid
SWalHead head;
} SWalReadHandle;
typedef int32_t (*FWalWrite)(void *ahandle, void *pHead); typedef int32_t (*FWalWrite)(void *ahandle, void *pHead);
// module initialization // module initialization
...@@ -154,6 +162,10 @@ int32_t walEndTakeSnapshot(SWal *); ...@@ -154,6 +162,10 @@ int32_t walEndTakeSnapshot(SWal *);
//int32_t walDataCorrupted(SWal*); //int32_t walDataCorrupted(SWal*);
// read // read
SWalReadHandle* walOpenReadHandle(SWal *);
void walCloseReadHandle(SWalReadHandle *);
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver);
int32_t walRead(SWal *, SWalHead **, int64_t ver); int32_t walRead(SWal *, SWalHead **, int64_t ver);
int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum); int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum);
......
...@@ -90,7 +90,7 @@ static inline int walValidHeadCksum(SWalHead* pHead) { ...@@ -90,7 +90,7 @@ static inline int walValidHeadCksum(SWalHead* pHead) {
} }
static inline int walValidBodyCksum(SWalHead* pHead) { static inline int walValidBodyCksum(SWalHead* pHead) {
return taosCheckChecksum((uint8_t*)pHead->head.cont, pHead->head.len, pHead->cksumBody); return taosCheckChecksum((uint8_t*)pHead->head.body, pHead->head.len, pHead->cksumBody);
} }
static inline int walValidCksum(SWalHead *pHead, void* body, int64_t bodyLen) { static inline int walValidCksum(SWalHead *pHead, void* body, int64_t bodyLen) {
......
...@@ -25,15 +25,15 @@ ...@@ -25,15 +25,15 @@
#include <regex.h> #include <regex.h>
int64_t walGetFirstVer(SWal *pWal) { int64_t walGetFirstVer(SWal *pWal) {
return pWal->firstVersion; return pWal->vers.firstVer;
} }
int64_t walGetSnaphostVer(SWal *pWal) { int64_t walGetSnaphostVer(SWal *pWal) {
return pWal->snapshotVersion; return pWal->vers.snapshotVer;
} }
int64_t walGetLastVer(SWal *pWal) { int64_t walGetLastVer(SWal *pWal) {
return pWal->lastVersion; return pWal->vers.lastVer;
} }
int walRollFileInfo(SWal* pWal) { int walRollFileInfo(SWal* pWal) {
...@@ -42,7 +42,7 @@ int walRollFileInfo(SWal* pWal) { ...@@ -42,7 +42,7 @@ int walRollFileInfo(SWal* pWal) {
SArray* pArray = pWal->fileInfoSet; SArray* pArray = pWal->fileInfoSet;
if(taosArrayGetSize(pArray) != 0) { if(taosArrayGetSize(pArray) != 0) {
WalFileInfo *pInfo = taosArrayGetLast(pArray); WalFileInfo *pInfo = taosArrayGetLast(pArray);
pInfo->lastVer = pWal->lastVersion; pInfo->lastVer = pWal->vers.lastVer;
pInfo->closeTs = ts; pInfo->closeTs = ts;
} }
...@@ -51,7 +51,7 @@ int walRollFileInfo(SWal* pWal) { ...@@ -51,7 +51,7 @@ int walRollFileInfo(SWal* pWal) {
if(pNewInfo == NULL) { if(pNewInfo == NULL) {
return -1; return -1;
} }
pNewInfo->firstVer = pWal->lastVersion + 1; pNewInfo->firstVer = pWal->vers.lastVer + 1;
pNewInfo->lastVer = -1; pNewInfo->lastVer = -1;
pNewInfo->createTs = ts; pNewInfo->createTs = ts;
pNewInfo->closeTs = -1; pNewInfo->closeTs = -1;
...@@ -74,13 +74,13 @@ char* walMetaSerialize(SWal* pWal) { ...@@ -74,13 +74,13 @@ char* walMetaSerialize(SWal* pWal) {
return NULL; return NULL;
} }
cJSON_AddItemToObject(pRoot, "meta", pMeta); cJSON_AddItemToObject(pRoot, "meta", pMeta);
sprintf(buf, "%" PRId64, pWal->firstVersion); sprintf(buf, "%" PRId64, pWal->vers.firstVer);
cJSON_AddStringToObject(pMeta, "firstVer", buf); cJSON_AddStringToObject(pMeta, "firstVer", buf);
sprintf(buf, "%" PRId64, pWal->snapshotVersion); sprintf(buf, "%" PRId64, pWal->vers.snapshotVer);
cJSON_AddStringToObject(pMeta, "snapshotVer", buf); cJSON_AddStringToObject(pMeta, "snapshotVer", buf);
sprintf(buf, "%" PRId64, pWal->commitVersion); sprintf(buf, "%" PRId64, pWal->vers.commitVer);
cJSON_AddStringToObject(pMeta, "commitVer", buf); cJSON_AddStringToObject(pMeta, "commitVer", buf);
sprintf(buf, "%" PRId64, pWal->lastVersion); sprintf(buf, "%" PRId64, pWal->vers.lastVer);
cJSON_AddStringToObject(pMeta, "lastVer", buf); cJSON_AddStringToObject(pMeta, "lastVer", buf);
cJSON_AddItemToObject(pRoot, "files", pFiles); cJSON_AddItemToObject(pRoot, "files", pFiles);
...@@ -116,13 +116,13 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) { ...@@ -116,13 +116,13 @@ int walMetaDeserialize(SWal* pWal, const char* bytes) {
pRoot = cJSON_Parse(bytes); pRoot = cJSON_Parse(bytes);
pMeta = cJSON_GetObjectItem(pRoot, "meta"); pMeta = cJSON_GetObjectItem(pRoot, "meta");
pField = cJSON_GetObjectItem(pMeta, "firstVer"); pField = cJSON_GetObjectItem(pMeta, "firstVer");
pWal->firstVersion = atoll(cJSON_GetStringValue(pField)); pWal->vers.firstVer = atoll(cJSON_GetStringValue(pField));
pField = cJSON_GetObjectItem(pMeta, "snapshotVer"); pField = cJSON_GetObjectItem(pMeta, "snapshotVer");
pWal->snapshotVersion = atoll(cJSON_GetStringValue(pField)); pWal->vers.snapshotVer = atoll(cJSON_GetStringValue(pField));
pField = cJSON_GetObjectItem(pMeta, "commitVer"); pField = cJSON_GetObjectItem(pMeta, "commitVer");
pWal->commitVersion = atoll(cJSON_GetStringValue(pField)); pWal->vers.commitVer = atoll(cJSON_GetStringValue(pField));
pField = cJSON_GetObjectItem(pMeta, "lastVer"); pField = cJSON_GetObjectItem(pMeta, "lastVer");
pWal->lastVersion = atoll(cJSON_GetStringValue(pField)); pWal->vers.lastVer = atoll(cJSON_GetStringValue(pField));
pFiles = cJSON_GetObjectItem(pRoot, "files"); pFiles = cJSON_GetObjectItem(pRoot, "files");
int sz = cJSON_GetArraySize(pFiles); int sz = cJSON_GetArraySize(pFiles);
...@@ -161,7 +161,7 @@ static int walFindCurMetaVer(SWal* pWal) { ...@@ -161,7 +161,7 @@ static int walFindCurMetaVer(SWal* pWal) {
DIR *dir = opendir(pWal->path); DIR *dir = opendir(pWal->path);
if(dir == NULL) { if(dir == NULL) {
wError("vgId:%d, path:%s, failed to open since %s", pWal->vgId, pWal->path, strerror(errno)); wError("vgId:%d, path:%s, failed to open since %s", pWal->cfg.vgId, pWal->path, strerror(errno));
return -1; return -1;
} }
......
...@@ -86,21 +86,15 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -86,21 +86,15 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
pWal->writeCur = -1; pWal->writeCur = -1;
//set config //set config
pWal->vgId = pCfg->vgId; memcpy(&pWal->cfg, pCfg, sizeof(SWalCfg));
pWal->fsyncPeriod = pCfg->fsyncPeriod;
pWal->rollPeriod = pCfg->rollPeriod;
pWal->segSize = pCfg->segSize;
pWal->retentionSize = pCfg->retentionSize;
pWal->retentionPeriod = pCfg->retentionPeriod;
pWal->level = pCfg->walLevel;
//init version info //init version info
pWal->firstVersion = -1; pWal->vers.firstVer = -1;
pWal->commitVersion = -1; pWal->vers.commitVer = -1;
pWal->snapshotVersion = -1; pWal->vers.snapshotVer = -1;
pWal->lastVersion = -1; pWal->vers.lastVer = -1;
pWal->snapshottingVer = -1; pWal->vers.verInSnapshotting = -1;
pWal->totSize = 0; pWal->totSize = 0;
...@@ -108,8 +102,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -108,8 +102,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
pWal->lastRollSeq = -1; pWal->lastRollSeq = -1;
//init write buffer //init write buffer
memset(&pWal->head, 0, sizeof(SWalHead)); memset(&pWal->writeHead, 0, sizeof(SWalHead));
pWal->head.head.sver = 0; pWal->writeHead.head.sver = 0;
tstrncpy(pWal->path, path, sizeof(pWal->path)); tstrncpy(pWal->path, path, sizeof(pWal->path));
pthread_mutex_init(&pWal->mutex, NULL); pthread_mutex_init(&pWal->mutex, NULL);
...@@ -129,7 +123,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -129,7 +123,7 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
} }
walReadMeta(pWal); walReadMeta(pWal);
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->vgId, pWal, pWal->level, pWal->fsyncPeriod); wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, pWal->cfg.fsyncPeriod);
return pWal; return pWal;
} }
...@@ -137,17 +131,17 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -137,17 +131,17 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { int32_t walAlter(SWal *pWal, SWalCfg *pCfg) {
if (pWal == NULL) return TSDB_CODE_WAL_APP_ERROR; if (pWal == NULL) return TSDB_CODE_WAL_APP_ERROR;
if (pWal->level == pCfg->walLevel && pWal->fsyncPeriod == pCfg->fsyncPeriod) { if (pWal->cfg.level == pCfg->level && pWal->cfg.fsyncPeriod == pCfg->fsyncPeriod) {
wDebug("vgId:%d, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->vgId, pWal->level, wDebug("vgId:%d, old walLevel:%d fsync:%d, new walLevel:%d fsync:%d not change", pWal->cfg.vgId, pWal->cfg.level,
pWal->fsyncPeriod, pCfg->walLevel, pCfg->fsyncPeriod); pWal->cfg.fsyncPeriod, pCfg->level, pCfg->fsyncPeriod);
return 0; return 0;
} }
wInfo("vgId:%d, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d", pWal->vgId, pWal->level, wInfo("vgId:%d, change old walLevel:%d fsync:%d, new walLevel:%d fsync:%d", pWal->cfg.vgId, pWal->cfg.level,
pWal->fsyncPeriod, pCfg->walLevel, pCfg->fsyncPeriod); pWal->cfg.fsyncPeriod, pCfg->level, pCfg->fsyncPeriod);
pWal->level = pCfg->walLevel; pWal->cfg.level = pCfg->level;
pWal->fsyncPeriod = pCfg->fsyncPeriod; pWal->cfg.fsyncPeriod = pCfg->fsyncPeriod;
pWal->fsyncSeq = pCfg->fsyncPeriod / 1000; pWal->fsyncSeq = pCfg->fsyncPeriod / 1000;
if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1; if (pWal->fsyncSeq <= 0) pWal->fsyncSeq = 1;
...@@ -171,22 +165,22 @@ void walClose(SWal *pWal) { ...@@ -171,22 +165,22 @@ void walClose(SWal *pWal) {
static int32_t walInitObj(SWal *pWal) { static int32_t walInitObj(SWal *pWal) {
if (taosMkDir(pWal->path) != 0) { if (taosMkDir(pWal->path) != 0) {
wError("vgId:%d, path:%s, failed to create directory since %s", pWal->vgId, pWal->path, strerror(errno)); wError("vgId:%d, path:%s, failed to create directory since %s", pWal->cfg.vgId, pWal->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
pWal->fileInfoSet = taosArrayInit(8, sizeof(WalFileInfo)); pWal->fileInfoSet = taosArrayInit(8, sizeof(WalFileInfo));
if(pWal->fileInfoSet == NULL) { if(pWal->fileInfoSet == NULL) {
wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->vgId, pWal->path, strerror(errno)); wError("vgId:%d, path:%s, failed to init taosArray %s", pWal->cfg.vgId, pWal->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
wDebug("vgId:%d, object is initialized", pWal->vgId); wDebug("vgId:%d, object is initialized", pWal->cfg.vgId);
return 0; return 0;
} }
static void walFreeObj(void *wal) { static void walFreeObj(void *wal) {
SWal *pWal = wal; SWal *pWal = wal;
wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal); wDebug("vgId:%d, wal:%p is freed", pWal->cfg.vgId, pWal);
tfClose(pWal->writeLogTfd); tfClose(pWal->writeLogTfd);
tfClose(pWal->writeIdxTfd); tfClose(pWal->writeIdxTfd);
...@@ -197,7 +191,7 @@ static void walFreeObj(void *wal) { ...@@ -197,7 +191,7 @@ static void walFreeObj(void *wal) {
} }
static bool walNeedFsync(SWal *pWal) { static bool walNeedFsync(SWal *pWal) {
if (pWal->fsyncPeriod <= 0 || pWal->level != TAOS_WAL_FSYNC) { if (pWal->cfg.fsyncPeriod <= 0 || pWal->cfg.level != TAOS_WAL_FSYNC) {
return false; return false;
} }
...@@ -217,10 +211,10 @@ static void walFsyncAll() { ...@@ -217,10 +211,10 @@ static void walFsyncAll() {
SWal *pWal = taosIterateRef(tsWal.refSetId, 0); SWal *pWal = taosIterateRef(tsWal.refSetId, 0);
while (pWal) { while (pWal) {
if (walNeedFsync(pWal)) { if (walNeedFsync(pWal)) {
wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq)); wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->cfg.vgId, pWal->cfg.level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq));
int32_t code = tfFsync(pWal->writeLogTfd); int32_t code = tfFsync(pWal->writeLogTfd);
if (code != 0) { if (code != 0) {
wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(code)); wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(code));
} }
} }
pWal = taosIterateRef(tsWal.refSetId, pWal->refId); pWal = taosIterateRef(tsWal.refSetId, pWal->refId);
......
...@@ -16,6 +16,147 @@ ...@@ -16,6 +16,147 @@
#include "walInt.h" #include "walInt.h"
#include "tfile.h" #include "tfile.h"
SWalReadHandle* walOpenReadHandle(SWal* pWal) {
SWalReadHandle *pRead = malloc(sizeof(SWalReadHandle));
if(pRead == NULL) {
return NULL;
}
memset(pRead, 0, sizeof(SWalReadHandle));
pRead->pWal = pWal;
pRead->readIdxTfd = -1;
pRead->readLogTfd = -1;
return NULL;
}
void walCloseReadHandle(SWalReadHandle *pRead) {
tfClose(pRead->readIdxTfd);
tfClose(pRead->readLogTfd);
free(pRead);
}
int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) {
return 0;
}
static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) {
int code = 0;
int64_t idxTfd = pRead->readIdxTfd;
int64_t logTfd = pRead->readLogTfd;
//seek position
int64_t offset = (ver - fileFirstVer) * WAL_IDX_ENTRY_SIZE;
code = tfLseek(idxTfd, offset, SEEK_SET);
if(code != 0) {
return -1;
}
WalIdxEntry entry;
code = tfRead(idxTfd, &entry, sizeof(WalIdxEntry));
if(code != 0) {
return -1;
}
//TODO:deserialize
ASSERT(entry.ver == ver);
code = tfLseek(logTfd, entry.offset, SEEK_SET);
if (code != 0) {
return -1;
}
return code;
}
static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
char fnameStr[WAL_FILE_LEN];
tfClose(pRead->readIdxTfd);
tfClose(pRead->readLogTfd);
walBuildLogName(pRead->pWal, fileFirstVer, fnameStr);
int logTfd = tfOpenRead(fnameStr);
if(logTfd < 0) {
return -1;
}
walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr);
int idxTfd = tfOpenRead(fnameStr);
if(idxTfd < 0) {
return -1;
}
pRead->readLogTfd = logTfd;
pRead->readIdxTfd = idxTfd;
return 0;
}
static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
int code;
SWal *pWal = pRead->pWal;
if(ver == pWal->vers.lastVer) {
return 0;
}
if(ver > pWal->vers.lastVer || ver < pWal->vers.firstVer) {
return -1;
}
if(ver < pWal->vers.snapshotVer) {
}
WalFileInfo tmpInfo;
tmpInfo.firstVer = ver;
//bsearch in fileSet
WalFileInfo* pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
ASSERT(pRet != NULL);
if(pRead->curFileFirstVer != pRet->firstVer) {
code = walReadChangeFile(pRead, pRet->firstVer);
if(code < 0) {
//TODO: set error flag
return -1;
}
}
code = walReadSeekFilePos(pRead, pRet->firstVer, ver);
if(code < 0) {
return -1;
}
pRead->curVersion = ver;
return 0;
}
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
int code;
//TODO: check wal life
if(pRead->curVersion != ver) {
walReadSeekVer(pRead, ver);
}
if(!tfValid(pRead->readLogTfd)) return -1;
if(sizeof(SWalHead) != tfRead(pRead->readLogTfd, &pRead->head, sizeof(SWalHead))) {
return -1;
}
code = walValidHeadCksum(&pRead->head);
if(code != 0) {
return -1;
}
if(pRead->capacity < pRead->head.head.len) {
void* ptr = realloc(pRead, pRead->head.head.len);
if(ptr == NULL) {
return -1;
}
pRead = ptr;
pRead->capacity = pRead->head.head.len;
}
if(pRead->head.head.len != tfRead(pRead->readLogTfd, &pRead->head.head.body, pRead->head.head.len)) {
return -1;
}
code = walValidBodyCksum(&pRead->head);
if(code != 0) {
return -1;
}
return 0;
}
int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
int code; int code;
code = walSeekVer(pWal, ver); code = walSeekVer(pWal, ver);
...@@ -42,7 +183,7 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { ...@@ -42,7 +183,7 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
*ppHead = NULL; *ppHead = NULL;
return -1; return -1;
} }
if(tfRead(pWal->writeLogTfd, (*ppHead)->head.cont, (*ppHead)->head.len) != (*ppHead)->head.len) { if(tfRead(pWal->writeLogTfd, (*ppHead)->head.body, (*ppHead)->head.len) != (*ppHead)->head.len) {
return -1; return -1;
} }
//TODO: endian compatibility processing after read //TODO: endian compatibility processing after read
......
...@@ -78,10 +78,12 @@ int walChangeFile(SWal *pWal, int64_t ver) { ...@@ -78,10 +78,12 @@ int walChangeFile(SWal *pWal, int64_t ver) {
code = tfClose(pWal->writeLogTfd); code = tfClose(pWal->writeLogTfd);
if(code != 0) { if(code != 0) {
//TODO //TODO
return -1;
} }
code = tfClose(pWal->writeIdxTfd); code = tfClose(pWal->writeIdxTfd);
if(code != 0) { if(code != 0) {
//TODO //TODO
return -1;
} }
WalFileInfo tmpInfo; WalFileInfo tmpInfo;
tmpInfo.firstVer = ver; tmpInfo.firstVer = ver;
...@@ -106,24 +108,19 @@ int walChangeFile(SWal *pWal, int64_t ver) { ...@@ -106,24 +108,19 @@ int walChangeFile(SWal *pWal, int64_t ver) {
pWal->writeLogTfd = logTfd; pWal->writeLogTfd = logTfd;
pWal->writeIdxTfd = idxTfd; pWal->writeIdxTfd = idxTfd;
return code; return fileFirstVer;
}
int walGetVerOffset(SWal* pWal, int64_t ver) {
int code;
return 0;
} }
int walSeekVer(SWal *pWal, int64_t ver) { int walSeekVer(SWal *pWal, int64_t ver) {
int code; int code;
if(ver == pWal->lastVersion) { if(ver == pWal->vers.lastVer) {
return 0; return 0;
} }
if(ver > pWal->lastVersion || ver < pWal->firstVersion) { if(ver > pWal->vers.lastVer|| ver < pWal->vers.firstVer) {
return -1; return -1;
} }
if(ver < pWal->snapshotVersion) { if(ver < pWal->vers.snapshotVer) {
//TODO: set flag to prevent roll back
} }
if(ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) { if(ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) {
code = walChangeFile(pWal, ver); code = walChangeFile(pWal, ver);
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "os.h" #include "os.h"
#include "walInt.h" #include "walInt.h"
#if 0
int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId) { int32_t walGetNextFile(SWal *pWal, int64_t *nextFileId) {
int64_t curFileId = *nextFileId; int64_t curFileId = *nextFileId;
int64_t minFileId = INT64_MAX; int64_t minFileId = INT64_MAX;
...@@ -116,3 +117,4 @@ int32_t walGetNewFile(SWal *pWal, int64_t *newFileId) { ...@@ -116,3 +117,4 @@ int32_t walGetNewFile(SWal *pWal, int64_t *newFileId) {
return 0; return 0;
} }
#endif
...@@ -114,22 +114,22 @@ void walRemoveAllOldFiles(void *handle) { ...@@ -114,22 +114,22 @@ void walRemoveAllOldFiles(void *handle) {
#endif #endif
int32_t walCommit(SWal *pWal, int64_t ver) { int32_t walCommit(SWal *pWal, int64_t ver) {
ASSERT(pWal->commitVersion >= pWal->snapshotVersion); ASSERT(pWal->vers.commitVer >= pWal->vers.snapshotVer);
ASSERT(pWal->commitVersion <= pWal->lastVersion); ASSERT(pWal->vers.commitVer <= pWal->vers.lastVer);
if(ver < pWal->commitVersion || ver > pWal->lastVersion) { if(ver < pWal->vers.commitVer || ver > pWal->vers.lastVer) {
return -1; return -1;
} }
pWal->commitVersion = ver; pWal->vers.commitVer = ver;
return 0; return 0;
} }
int32_t walRollback(SWal *pWal, int64_t ver) { int32_t walRollback(SWal *pWal, int64_t ver) {
int code; int code;
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
if(ver == pWal->lastVersion) { if(ver == pWal->vers.lastVer) {
return 0; return 0;
} }
if(ver > pWal->lastVersion || ver < pWal->commitVersion) { if(ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) {
return -1; return -1;
} }
pthread_mutex_lock(&pWal->mutex); pthread_mutex_lock(&pWal->mutex);
...@@ -220,7 +220,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -220,7 +220,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
if(code < 0) { if(code < 0) {
return -1; return -1;
} }
pWal->lastVersion = ver - 1; pWal->vers.lastVer = ver - 1;
((WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1; ((WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1;
((WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset; ((WalFileInfo*)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset;
...@@ -230,9 +230,9 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -230,9 +230,9 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
} }
int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) { int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) {
pWal->snapshottingVer = ver; pWal->vers.verInSnapshotting = ver;
//check file rolling //check file rolling
if(pWal->retentionPeriod == 0) { if(pWal->cfg.retentionPeriod == 0) {
walRoll(pWal); walRoll(pWal);
} }
...@@ -240,10 +240,10 @@ int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) { ...@@ -240,10 +240,10 @@ int32_t walBeginTakeSnapshot(SWal* pWal, int64_t ver) {
} }
int32_t walEndTakeSnapshot(SWal *pWal) { int32_t walEndTakeSnapshot(SWal *pWal) {
int64_t ver = pWal->snapshottingVer; int64_t ver = pWal->vers.verInSnapshotting;
if(ver == -1) return -1; if(ver == -1) return -1;
pWal->snapshotVersion = ver; pWal->vers.snapshotVer = ver;
int ts = taosGetTimestampSec(); int ts = taosGetTimestampSec();
int deleteCnt = 0; int deleteCnt = 0;
...@@ -257,8 +257,8 @@ int32_t walEndTakeSnapshot(SWal *pWal) { ...@@ -257,8 +257,8 @@ int32_t walEndTakeSnapshot(SWal *pWal) {
} }
//iterate files, until the searched result //iterate files, until the searched result
for(WalFileInfo* iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { for(WalFileInfo* iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
if(pWal->totSize > pWal->retentionSize || if(pWal->totSize > pWal->cfg.retentionSize ||
iter->closeTs + pWal->retentionPeriod > ts) { iter->closeTs + pWal->cfg.retentionPeriod > ts) {
//delete according to file size or close time //delete according to file size or close time
deleteCnt++; deleteCnt++;
newTotSize -= iter->fileSize; newTotSize -= iter->fileSize;
...@@ -278,13 +278,13 @@ int32_t walEndTakeSnapshot(SWal *pWal) { ...@@ -278,13 +278,13 @@ int32_t walEndTakeSnapshot(SWal *pWal) {
taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt); taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);
if(taosArrayGetSize(pWal->fileInfoSet) == 0) { if(taosArrayGetSize(pWal->fileInfoSet) == 0) {
pWal->writeCur = -1; pWal->writeCur = -1;
pWal->firstVersion = -1; pWal->vers.firstVer = -1;
} else { } else {
pWal->firstVersion = ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; pWal->vers.firstVer = ((WalFileInfo*)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
} }
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;; pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;;
pWal->totSize = newTotSize; pWal->totSize = newTotSize;
pWal->snapshottingVer = -1; pWal->vers.verInSnapshotting = -1;
//save snapshot ver, commit ver //save snapshot ver, commit ver
int code = walWriteMeta(pWal); int code = walWriteMeta(pWal);
...@@ -311,7 +311,7 @@ int walRoll(SWal *pWal) { ...@@ -311,7 +311,7 @@ int walRoll(SWal *pWal) {
} }
int64_t idxTfd, logTfd; int64_t idxTfd, logTfd;
//create new file //create new file
int64_t newFileFirstVersion = pWal->lastVersion + 1; int64_t newFileFirstVersion = pWal->vers.lastVer + 1;
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
walBuildIdxName(pWal, newFileFirstVersion, fnameStr); walBuildIdxName(pWal, newFileFirstVersion, fnameStr);
idxTfd = tfOpenCreateWrite(fnameStr); idxTfd = tfOpenCreateWrite(fnameStr);
...@@ -357,18 +357,18 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i ...@@ -357,18 +357,18 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
int code = 0; int code = 0;
// no wal // no wal
if (pWal->level == TAOS_WAL_NOLOG) return 0; if (pWal->cfg.level == TAOS_WAL_NOLOG) return 0;
if (index == pWal->lastVersion + 1) { if (index == pWal->vers.lastVer + 1) {
if(taosArrayGetSize(pWal->fileInfoSet) == 0) { if(taosArrayGetSize(pWal->fileInfoSet) == 0) {
pWal->firstVersion = index; pWal->vers.firstVer = index;
code = walRoll(pWal); code = walRoll(pWal);
ASSERT(code == 0); ASSERT(code == 0);
} else { } else {
int64_t passed = walGetSeq() - pWal->lastRollSeq; int64_t passed = walGetSeq() - pWal->lastRollSeq;
if(pWal->rollPeriod != -1 && pWal->rollPeriod != 0 && passed > pWal->rollPeriod) { if(pWal->cfg.rollPeriod != -1 && pWal->cfg.rollPeriod != 0 && passed > pWal->cfg.rollPeriod) {
walRoll(pWal); walRoll(pWal);
} else if(pWal->segSize != -1 && pWal->segSize != 0 && walGetLastFileSize(pWal) > pWal->segSize) { } else if(pWal->cfg.segSize != -1 && pWal->cfg.segSize != 0 && walGetLastFileSize(pWal) > pWal->cfg.segSize) {
walRoll(pWal); walRoll(pWal);
} }
} }
...@@ -380,23 +380,23 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i ...@@ -380,23 +380,23 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
/*if (!tfValid(pWal->curLogTfd)) return 0;*/ /*if (!tfValid(pWal->curLogTfd)) return 0;*/
pthread_mutex_lock(&pWal->mutex); pthread_mutex_lock(&pWal->mutex);
pWal->head.head.version = index; pWal->writeHead.head.version = index;
pWal->head.head.len = bodyLen; pWal->writeHead.head.len = bodyLen;
pWal->head.head.msgType = msgType; pWal->writeHead.head.msgType = msgType;
pWal->head.cksumHead = walCalcHeadCksum(&pWal->head); pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead);
pWal->head.cksumBody = walCalcBodyCksum(body, bodyLen); pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen);
if (tfWrite(pWal->writeLogTfd, &pWal->head, sizeof(SWalHead)) != sizeof(SWalHead)) { if (tfWrite(pWal->writeLogTfd, &pWal->writeHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
//ftruncate //ftruncate
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno)); wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno));
} }
if (tfWrite(pWal->writeLogTfd, &body, bodyLen) != bodyLen) { if (tfWrite(pWal->writeLogTfd, &body, bodyLen) != bodyLen) {
//ftruncate //ftruncate
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno)); wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno));
} }
code = walWriteIndex(pWal, index, walGetCurFileOffset(pWal)); code = walWriteIndex(pWal, index, walGetCurFileOffset(pWal));
if(code != 0) { if(code != 0) {
...@@ -405,7 +405,7 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i ...@@ -405,7 +405,7 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
} }
//set status //set status
pWal->lastVersion = index; pWal->vers.lastVer = index;
pWal->totSize += sizeof(SWalHead) + bodyLen; pWal->totSize += sizeof(SWalHead) + bodyLen;
walGetCurFileInfo(pWal)->lastVer = index; walGetCurFileInfo(pWal)->lastVer = index;
walGetCurFileInfo(pWal)->fileSize += sizeof(SWalHead) + bodyLen; walGetCurFileInfo(pWal)->fileSize += sizeof(SWalHead) + bodyLen;
...@@ -416,10 +416,10 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i ...@@ -416,10 +416,10 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
} }
void walFsync(SWal *pWal, bool forceFsync) { void walFsync(SWal *pWal, bool forceFsync) {
if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) { if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->vgId, walGetCurFileFirstVer(pWal)); wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
if (tfFsync(pWal->writeLogTfd) < 0) { if (tfFsync(pWal->writeLogTfd) < 0) {
wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->vgId, walGetCurFileFirstVer(pWal), strerror(errno)); wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal), strerror(errno));
} }
} }
} }
...@@ -492,29 +492,29 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) { ...@@ -492,29 +492,29 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
} }
#endif #endif
static int walValidateOffset(SWal* pWal, int64_t ver) { /*static int walValidateOffset(SWal* pWal, int64_t ver) {*/
int code = 0; /*int code = 0;*/
SWalHead *pHead = NULL; /*SWalHead *pHead = NULL;*/
code = (int)walRead(pWal, &pHead, ver); /*code = (int)walRead(pWal, &pHead, ver);*/
if(pHead->head.version != ver) { /*if(pHead->head.version != ver) {*/
return -1; /*return -1;*/
} /*}*/
return 0; /*return 0;*/
} /*}*/
static int64_t walGetOffset(SWal* pWal, int64_t ver) { /*static int64_t walGetOffset(SWal* pWal, int64_t ver) {*/
int code = walSeekVer(pWal, ver); /*int code = walSeekVer(pWal, ver);*/
if(code != 0) { /*if(code != 0) {*/
return -1; /*return -1;*/
} /*}*/
code = walValidateOffset(pWal, ver); /*code = walValidateOffset(pWal, ver);*/
if(code != 0) { /*if(code != 0) {*/
return -1; /*return -1;*/
} /*}*/
return 0; /*return 0;*/
} /*}*/
#if 0 #if 0
static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) { static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) {
......
...@@ -24,7 +24,7 @@ class WalCleanEnv : public ::testing::Test { ...@@ -24,7 +24,7 @@ class WalCleanEnv : public ::testing::Test {
pCfg->segSize = -1; pCfg->segSize = -1;
pCfg->retentionPeriod = 0; pCfg->retentionPeriod = 0;
pCfg->retentionSize = 0; pCfg->retentionSize = 0;
pCfg->walLevel = TAOS_WAL_FSYNC; pCfg->level = TAOS_WAL_FSYNC;
pWal = walOpen(pathName, pCfg); pWal = walOpen(pathName, pCfg);
free(pCfg); free(pCfg);
ASSERT(pWal != NULL); ASSERT(pWal != NULL);
...@@ -56,7 +56,7 @@ class WalCleanDeleteEnv : public ::testing::Test { ...@@ -56,7 +56,7 @@ class WalCleanDeleteEnv : public ::testing::Test {
memset(pCfg, 0, sizeof(SWalCfg)); memset(pCfg, 0, sizeof(SWalCfg));
pCfg->retentionPeriod = 0; pCfg->retentionPeriod = 0;
pCfg->retentionSize = 0; pCfg->retentionSize = 0;
pCfg->walLevel = TAOS_WAL_FSYNC; pCfg->level = TAOS_WAL_FSYNC;
pWal = walOpen(pathName, pCfg); pWal = walOpen(pathName, pCfg);
free(pCfg); free(pCfg);
ASSERT(pWal != NULL); ASSERT(pWal != NULL);
...@@ -95,7 +95,7 @@ class WalKeepEnv : public ::testing::Test { ...@@ -95,7 +95,7 @@ class WalKeepEnv : public ::testing::Test {
pCfg->segSize = -1; pCfg->segSize = -1;
pCfg->retentionPeriod = 0; pCfg->retentionPeriod = 0;
pCfg->retentionSize = 0; pCfg->retentionSize = 0;
pCfg->walLevel = TAOS_WAL_FSYNC; pCfg->level = TAOS_WAL_FSYNC;
pWal = walOpen(pathName, pCfg); pWal = walOpen(pathName, pCfg);
free(pCfg); free(pCfg);
ASSERT(pWal != NULL); ASSERT(pWal != NULL);
...@@ -164,18 +164,18 @@ TEST_F(WalKeepEnv, readOldMeta) { ...@@ -164,18 +164,18 @@ TEST_F(WalKeepEnv, readOldMeta) {
for(int i = 0; i < 10; i++) { for(int i = 0; i < 10; i++) {
code = walWrite(pWal, i, i+1, (void*)ranStr, len); code = walWrite(pWal, i, i+1, (void*)ranStr, len);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->lastVersion, i); ASSERT_EQ(pWal->vers.lastVer, i);
code = walWrite(pWal, i+2, i, (void*)ranStr, len); code = walWrite(pWal, i+2, i, (void*)ranStr, len);
ASSERT_EQ(code, -1); ASSERT_EQ(code, -1);
ASSERT_EQ(pWal->lastVersion, i); ASSERT_EQ(pWal->vers.lastVer, i);
} }
char* oldss = walMetaSerialize(pWal); char* oldss = walMetaSerialize(pWal);
TearDown(); TearDown();
SetUp(); SetUp();
ASSERT_EQ(pWal->firstVersion, 0); ASSERT_EQ(pWal->vers.firstVer, 0);
ASSERT_EQ(pWal->lastVersion, 9); ASSERT_EQ(pWal->vers.lastVer, 9);
char* newss = walMetaSerialize(pWal); char* newss = walMetaSerialize(pWal);
...@@ -195,10 +195,10 @@ TEST_F(WalCleanEnv, write) { ...@@ -195,10 +195,10 @@ TEST_F(WalCleanEnv, write) {
for(int i = 0; i < 10; i++) { for(int i = 0; i < 10; i++) {
code = walWrite(pWal, i, i+1, (void*)ranStr, len); code = walWrite(pWal, i, i+1, (void*)ranStr, len);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->lastVersion, i); ASSERT_EQ(pWal->vers.lastVer, i);
code = walWrite(pWal, i+2, i, (void*)ranStr, len); code = walWrite(pWal, i+2, i, (void*)ranStr, len);
ASSERT_EQ(code, -1); ASSERT_EQ(code, -1);
ASSERT_EQ(pWal->lastVersion, i); ASSERT_EQ(pWal->vers.lastVer, i);
} }
code = walWriteMeta(pWal); code = walWriteMeta(pWal);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
...@@ -211,14 +211,14 @@ TEST_F(WalCleanEnv, rollback) { ...@@ -211,14 +211,14 @@ TEST_F(WalCleanEnv, rollback) {
for(int i = 0; i < 10; i++) { for(int i = 0; i < 10; i++) {
code = walWrite(pWal, i, i+1, (void*)ranStr, len); code = walWrite(pWal, i, i+1, (void*)ranStr, len);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->lastVersion, i); ASSERT_EQ(pWal->vers.lastVer, i);
} }
code = walRollback(pWal, 5); code = walRollback(pWal, 5);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->lastVersion, 4); ASSERT_EQ(pWal->vers.lastVer, 4);
code = walRollback(pWal, 3); code = walRollback(pWal, 3);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->lastVersion, 2); ASSERT_EQ(pWal->vers.lastVer, 2);
code = walWriteMeta(pWal); code = walWriteMeta(pWal);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
} }
...@@ -231,16 +231,16 @@ TEST_F(WalCleanDeleteEnv, roll) { ...@@ -231,16 +231,16 @@ TEST_F(WalCleanDeleteEnv, roll) {
for(i = 0; i < 100; i++) { for(i = 0; i < 100; i++) {
code = walWrite(pWal, i, 0, (void*)ranStr, len); code = walWrite(pWal, i, 0, (void*)ranStr, len);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(pWal->lastVersion, i); ASSERT_EQ(pWal->vers.lastVer, i);
code = walCommit(pWal, i); code = walCommit(pWal, i);
ASSERT_EQ(pWal->commitVersion, i); ASSERT_EQ(pWal->vers.commitVer, i);
} }
walBeginTakeSnapshot(pWal, i-1); walBeginTakeSnapshot(pWal, i-1);
ASSERT_EQ(pWal->snapshottingVer, i-1); ASSERT_EQ(pWal->vers.verInSnapshotting, i-1);
walEndTakeSnapshot(pWal); walEndTakeSnapshot(pWal);
ASSERT_EQ(pWal->snapshotVersion, i-1); ASSERT_EQ(pWal->vers.snapshotVer, i-1);
ASSERT_EQ(pWal->snapshottingVer, -1); ASSERT_EQ(pWal->vers.verInSnapshotting, -1);
code = walWrite(pWal, 5, 0, (void*)ranStr, len); code = walWrite(pWal, 5, 0, (void*)ranStr, len);
ASSERT_NE(code, 0); ASSERT_NE(code, 0);
...@@ -249,7 +249,7 @@ TEST_F(WalCleanDeleteEnv, roll) { ...@@ -249,7 +249,7 @@ TEST_F(WalCleanDeleteEnv, roll) {
code = walWrite(pWal, i, 0, (void*)ranStr, len); code = walWrite(pWal, i, 0, (void*)ranStr, len);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
code = walCommit(pWal, i); code = walCommit(pWal, i);
ASSERT_EQ(pWal->commitVersion, i); ASSERT_EQ(pWal->vers.commitVer, i);
} }
//code = walWriteMeta(pWal); //code = walWriteMeta(pWal);
......
...@@ -250,7 +250,7 @@ void taosArrayPopFrontBatch(SArray* pArray, size_t cnt) { ...@@ -250,7 +250,7 @@ void taosArrayPopFrontBatch(SArray* pArray, size_t cnt) {
if(pArray->size == 0) { if(pArray->size == 0) {
return; return;
} }
memmove(pArray->pData, (char*)pArray->pData + cnt * pArray->elemSize, pArray->size); memmove(pArray->pData, (char*)pArray->pData + cnt * pArray->elemSize, pArray->size * pArray->elemSize);
} }
void taosArrayPopTailBatch(SArray* pArray, size_t cnt) { void taosArrayPopTailBatch(SArray* pArray, size_t cnt) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册