提交 03a54b5a 编写于 作者: L Liu Jicong

make tfinit called once

上级 756347e0
...@@ -91,7 +91,8 @@ typedef struct SWal { ...@@ -91,7 +91,8 @@ typedef struct SWal {
int32_t fsyncPeriod; // millisecond int32_t fsyncPeriod; // millisecond
int32_t rollPeriod; // second int32_t rollPeriod; // second
int64_t segSize; int64_t segSize;
int64_t rtSize; int64_t retentionSize;
int32_t retentionPeriod;
EWalType level; EWalType level;
//total size //total size
int64_t totSize; int64_t totSize;
...@@ -99,31 +100,24 @@ typedef struct SWal { ...@@ -99,31 +100,24 @@ typedef struct SWal {
int32_t fsyncSeq; int32_t fsyncSeq;
//reference //reference
int64_t refId; int64_t refId;
//current tfd //write tfd
int64_t curLogTfd; int64_t writeLogTfd;
int64_t curIdxTfd; int64_t writeIdxTfd;
//read tfd
int64_t readLogTfd;
int64_t readIdxTfd;
//current version //current version
int64_t curVersion; int64_t curVersion;
//current file version
//int64_t curFileFirstVersion;
//int64_t curFileLastVersion;
//wal lifecycle //wal lifecycle
int64_t firstVersion; int64_t firstVersion;
int64_t snapshotVersion; int64_t snapshotVersion;
int64_t commitVersion; int64_t commitVersion;
int64_t lastVersion; int64_t lastVersion;
//last file
//int64_t lastFileName;
//roll status //roll status
int64_t lastRollSeq; int64_t lastRollSeq;
//int64_t lastFileWriteSize;
//file set //file set
int32_t fileCursor; int32_t writeCur;
int32_t readCur;
SArray* fileInfoSet; SArray* fileInfoSet;
//ctl //ctl
int32_t curStatus; int32_t curStatus;
......
...@@ -146,6 +146,13 @@ void* taosArrayInsert(SArray* pArray, size_t index, void* pData); ...@@ -146,6 +146,13 @@ void* taosArrayInsert(SArray* pArray, size_t index, void* pData);
*/ */
void taosArraySet(SArray* pArray, size_t index, void* pData); void taosArraySet(SArray* pArray, size_t index, void* pData);
/**
* remove some data entry from front
* @param pArray
* @param cnt
*/
void taosArrayPopFrontBatch(SArray* pArray, size_t cnt);
/** /**
* remove data entry of the given index * remove data entry of the given index
* @param pArray * @param pArray
......
...@@ -49,26 +49,26 @@ static inline int64_t walGetLastFileFirstVer(SWal* pWal) { ...@@ -49,26 +49,26 @@ static inline int64_t walGetLastFileFirstVer(SWal* pWal) {
} }
static inline int64_t walGetCurFileFirstVer(SWal* pWal) { static inline int64_t walGetCurFileFirstVer(SWal* pWal) {
WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor); WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur);
return pInfo->firstVer; return pInfo->firstVer;
} }
static inline int64_t walGetCurFileLastVer(SWal* pWal) { static inline int64_t walGetCurFileLastVer(SWal* pWal) {
WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor); WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur);
return pInfo->firstVer; return pInfo->firstVer;
} }
static inline int64_t walGetCurFileOffset(SWal* pWal) { static inline int64_t walGetCurFileOffset(SWal* pWal) {
WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor); WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur);
return pInfo->fileSize; return pInfo->fileSize;
} }
static inline bool walCurFileClosed(SWal* pWal) { static inline bool walCurFileClosed(SWal* pWal) {
return taosArrayGetSize(pWal->fileInfoSet) != pWal->fileCursor; return taosArrayGetSize(pWal->fileInfoSet) != pWal->writeCur;
} }
static inline WalFileInfo* walGetCurFileInfo(SWal* pWal) { static inline WalFileInfo* walGetCurFileInfo(SWal* pWal) {
return (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor); return (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur);
} }
static inline int walBuildLogName(SWal*pWal, int64_t fileFirstVer, char* buf) { static inline int walBuildLogName(SWal*pWal, int64_t fileFirstVer, char* buf) {
......
...@@ -23,25 +23,25 @@ ...@@ -23,25 +23,25 @@
static int walSeekFilePos(SWal* pWal, int64_t ver) { static int walSeekFilePos(SWal* pWal, int64_t ver) {
int code = 0; int code = 0;
int64_t idxTfd = pWal->curIdxTfd; int64_t idxTfd = pWal->writeIdxTfd;
int64_t logTfd = pWal->curLogTfd; int64_t logTfd = pWal->writeLogTfd;
//seek position //seek position
int64_t offset = (ver - walGetCurFileFirstVer(pWal)) * WAL_IDX_ENTRY_SIZE; int64_t offset = (ver - walGetCurFileFirstVer(pWal)) * WAL_IDX_ENTRY_SIZE;
code = tfLseek(idxTfd, offset, SEEK_SET); code = tfLseek(idxTfd, offset, SEEK_SET);
if(code != 0) { if(code != 0) {
return -1;
} }
int64_t readBuf[2]; int64_t readBuf[2];
code = tfRead(idxTfd, readBuf, sizeof(readBuf)); code = tfRead(idxTfd, readBuf, sizeof(readBuf));
if(code != 0) { if(code != 0) {
return -1;
} }
//TODO:deserialize //TODO:deserialize
ASSERT(readBuf[0] == ver); ASSERT(readBuf[0] == ver);
code = tfLseek(logTfd, readBuf[1], SEEK_CUR); code = tfLseek(logTfd, readBuf[1], SEEK_CUR);
if (code != 0) { if (code != 0) {
return -1;
} }
/*pWal->curLogOffset = readBuf[1];*/ /*pWal->curLogOffset = readBuf[1];*/
pWal->curVersion = ver; pWal->curVersion = ver;
...@@ -52,11 +52,11 @@ static int walChangeFile(SWal *pWal, int64_t ver) { ...@@ -52,11 +52,11 @@ static int walChangeFile(SWal *pWal, int64_t ver) {
int code = 0; int code = 0;
int64_t idxTfd, logTfd; int64_t idxTfd, logTfd;
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
code = tfClose(pWal->curLogTfd); code = tfClose(pWal->writeLogTfd);
if(code != 0) { if(code != 0) {
//TODO //TODO
} }
code = tfClose(pWal->curIdxTfd); code = tfClose(pWal->writeIdxTfd);
if(code != 0) { if(code != 0) {
//TODO //TODO
} }
...@@ -81,14 +81,14 @@ static int walChangeFile(SWal *pWal, int64_t ver) { ...@@ -81,14 +81,14 @@ static int walChangeFile(SWal *pWal, int64_t ver) {
logTfd = tfOpenReadWrite(fnameStr); logTfd = tfOpenReadWrite(fnameStr);
} }
pWal->curLogTfd = logTfd; pWal->writeLogTfd = logTfd;
pWal->curIdxTfd = idxTfd; pWal->writeIdxTfd = idxTfd;
return code; return code;
} }
int walSeekVer(SWal *pWal, int64_t ver) { int walSeekVer(SWal *pWal, int64_t ver) {
if((!(pWal->curStatus & WAL_CUR_FAILED)) int code;
&& ver == pWal->curVersion) { if((!(pWal->curStatus & WAL_CUR_FAILED)) && ver == pWal->curVersion) {
return 0; return 0;
} }
if(ver > pWal->lastVersion) { if(ver > pWal->lastVersion) {
...@@ -103,9 +103,15 @@ int walSeekVer(SWal *pWal, int64_t ver) { ...@@ -103,9 +103,15 @@ int walSeekVer(SWal *pWal, int64_t ver) {
//TODO: seek snapshotted log, invalid in some cases //TODO: seek snapshotted log, invalid in some cases
} }
if(ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) { if(ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) {
walChangeFile(pWal, ver); code = walChangeFile(pWal, ver);
if(code != 0) {
return -1;
}
} }
walSeekFilePos(pWal, ver); code = walSeekFilePos(pWal, ver);
if(code != 0) {
return -1;
}
return 0; return 0;
} }
...@@ -48,9 +48,15 @@ int32_t walInit() { ...@@ -48,9 +48,15 @@ int32_t walInit() {
int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 1); int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 1);
if(old == 1) return 0; if(old == 1) return 0;
int code = tfInit();
if(code != 0) {
wError("failed to init tfile since %s", tstrerror(code));
atomic_store_8(&tsWal.inited, 0);
return code;
}
tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj); tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj);
int code = walCreateThread(); code = walCreateThread();
if (code != 0) { if (code != 0) {
wError("failed to init wal module since %s", tstrerror(code)); wError("failed to init wal module since %s", tstrerror(code));
atomic_store_8(&tsWal.inited, 0); atomic_store_8(&tsWal.inited, 0);
...@@ -74,8 +80,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) { ...@@ -74,8 +80,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
} }
pWal->curLogTfd = -1; pWal->writeLogTfd = -1;
pWal->curIdxTfd = -1; pWal->writeIdxTfd = -1;
//set config //set config
pWal->vgId = pCfg->vgId; pWal->vgId = pCfg->vgId;
...@@ -138,8 +144,8 @@ void walClose(SWal *pWal) { ...@@ -138,8 +144,8 @@ void walClose(SWal *pWal) {
if (pWal == NULL) return; if (pWal == NULL) return;
pthread_mutex_lock(&pWal->mutex); pthread_mutex_lock(&pWal->mutex);
tfClose(pWal->curLogTfd); tfClose(pWal->writeLogTfd);
tfClose(pWal->curIdxTfd); tfClose(pWal->writeIdxTfd);
/*taosArrayDestroy(pWal->fileInfoSet);*/ /*taosArrayDestroy(pWal->fileInfoSet);*/
/*pWal->fileInfoSet = NULL;*/ /*pWal->fileInfoSet = NULL;*/
pthread_mutex_unlock(&pWal->mutex); pthread_mutex_unlock(&pWal->mutex);
...@@ -165,8 +171,8 @@ static void walFreeObj(void *wal) { ...@@ -165,8 +171,8 @@ static void walFreeObj(void *wal) {
SWal *pWal = wal; SWal *pWal = wal;
wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal); wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal);
tfClose(pWal->curLogTfd); tfClose(pWal->writeLogTfd);
tfClose(pWal->curIdxTfd); tfClose(pWal->writeIdxTfd);
taosArrayDestroy(pWal->fileInfoSet); taosArrayDestroy(pWal->fileInfoSet);
pWal->fileInfoSet = NULL; pWal->fileInfoSet = NULL;
taosArrayDestroy(pWal->fileInfoSet); taosArrayDestroy(pWal->fileInfoSet);
...@@ -197,7 +203,7 @@ static void walFsyncAll() { ...@@ -197,7 +203,7 @@ static void walFsyncAll() {
while (pWal) { while (pWal) {
if (walNeedFsync(pWal)) { if (walNeedFsync(pWal)) {
wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq)); wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq));
int32_t code = tfFsync(pWal->curLogTfd); int32_t code = tfFsync(pWal->writeLogTfd);
if (code != 0) { if (code != 0) {
wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(code)); wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(code));
} }
......
...@@ -42,7 +42,7 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { ...@@ -42,7 +42,7 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
} }
*ppHead = ptr; *ppHead = ptr;
} }
if(tfRead(pWal->curLogTfd, *ppHead, sizeof(SWalHead)) != sizeof(SWalHead)) { if(tfRead(pWal->writeLogTfd, *ppHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
return -1; return -1;
} }
//TODO: endian compatibility processing after read //TODO: endian compatibility processing after read
...@@ -55,7 +55,7 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) { ...@@ -55,7 +55,7 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
*ppHead = NULL; *ppHead = NULL;
return -1; return -1;
} }
if(tfRead(pWal->curLogTfd, (*ppHead)->cont, (*ppHead)->len) != (*ppHead)->len) { if(tfRead(pWal->writeLogTfd, (*ppHead)->cont, (*ppHead)->len) != (*ppHead)->len) {
return -1; return -1;
} }
//TODO: endian compatibility processing after read //TODO: endian compatibility processing after read
......
...@@ -44,18 +44,39 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -44,18 +44,39 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
int32_t walTakeSnapshot(SWal *pWal, int64_t ver) { int32_t walTakeSnapshot(SWal *pWal, int64_t ver) {
pWal->snapshotVersion = ver; pWal->snapshotVersion = ver;
int ts = taosGetTimestampSec();
int deleteCnt = 0;
int64_t newTotSize = pWal->totSize;
WalFileInfo tmp; WalFileInfo tmp;
tmp.firstVer = ver; tmp.firstVer = ver;
//mark files safe to delete //mark files safe to delete
WalFileInfo* pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE); WalFileInfo* pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
//iterate files, until the searched result //iterate files, until the searched result
//if totSize > rtSize, delete for(WalFileInfo* iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
//if createTs > retentionTs, delete if(pWal->totSize > pWal->retentionSize ||
iter->closeTs + pWal->retentionPeriod > ts) {
//delete according to file size or close time
deleteCnt++;
newTotSize -= iter->fileSize;
}
}
char fnameStr[WAL_FILE_LEN];
//remove file
for(int i = 0; i < deleteCnt; i++) {
WalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, i);
walBuildLogName(pWal, pInfo->firstVer, fnameStr);
remove(fnameStr);
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
remove(fnameStr);
}
//save snapshot ver, commit ver //save snapshot ver, commit ver
//make new array, remove files //make new array, remove files
taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);
pWal->totSize = newTotSize;
return 0; return 0;
} }
...@@ -153,14 +174,14 @@ void walRemoveAllOldFiles(void *handle) { ...@@ -153,14 +174,14 @@ void walRemoveAllOldFiles(void *handle) {
int walRoll(SWal *pWal) { int walRoll(SWal *pWal) {
int code = 0; int code = 0;
if(pWal->curIdxTfd != -1) { if(pWal->writeIdxTfd != -1) {
code = tfClose(pWal->curIdxTfd); code = tfClose(pWal->writeIdxTfd);
if(code != 0) { if(code != 0) {
return -1; return -1;
} }
} }
if(pWal->curLogTfd != -1) { if(pWal->writeLogTfd != -1) {
code = tfClose(pWal->curLogTfd); code = tfClose(pWal->writeLogTfd);
if(code != 0) { if(code != 0) {
return -1; return -1;
} }
...@@ -188,8 +209,8 @@ int walRoll(SWal *pWal) { ...@@ -188,8 +209,8 @@ int walRoll(SWal *pWal) {
} }
//switch file //switch file
pWal->curIdxTfd = idxTfd; pWal->writeIdxTfd = idxTfd;
pWal->curLogTfd = logTfd; pWal->writeLogTfd = logTfd;
//change status //change status
pWal->curStatus = WAL_CUR_FILE_WRITABLE & WAL_CUR_POS_WRITABLE; pWal->curStatus = WAL_CUR_FILE_WRITABLE & WAL_CUR_POS_WRITABLE;
...@@ -215,8 +236,8 @@ int walChangeFileToLast(SWal *pWal) { ...@@ -215,8 +236,8 @@ int walChangeFileToLast(SWal *pWal) {
return -1; return -1;
} }
//switch file //switch file
pWal->curIdxTfd = idxTfd; pWal->writeIdxTfd = idxTfd;
pWal->curLogTfd = logTfd; pWal->writeLogTfd = logTfd;
//change status //change status
pWal->curVersion = fileFirstVer; pWal->curVersion = fileFirstVer;
pWal->curStatus = WAL_CUR_FILE_WRITABLE; pWal->curStatus = WAL_CUR_FILE_WRITABLE;
...@@ -226,15 +247,14 @@ int walChangeFileToLast(SWal *pWal) { ...@@ -226,15 +247,14 @@ int walChangeFileToLast(SWal *pWal) {
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
int code = 0; int code = 0;
//get index file //get index file
if(!tfValid(pWal->curIdxTfd)) { if(!tfValid(pWal->writeIdxTfd)) {
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
WalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, pWal->fileCursor); wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno));
wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pInfo->firstVer, strerror(errno));
return code; return code;
} }
int64_t writeBuf[2] = { ver, offset }; int64_t writeBuf[2] = { ver, offset };
int size = tfWrite(pWal->curIdxTfd, writeBuf, sizeof(writeBuf)); int size = tfWrite(pWal->writeIdxTfd, writeBuf, sizeof(writeBuf));
if(size != sizeof(writeBuf)) { if(size != sizeof(writeBuf)) {
return -1; return -1;
} }
...@@ -278,13 +298,13 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i ...@@ -278,13 +298,13 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
pthread_mutex_lock(&pWal->mutex); pthread_mutex_lock(&pWal->mutex);
if (tfWrite(pWal->curLogTfd, &pWal->head, sizeof(SWalHead)) != sizeof(SWalHead)) { if (tfWrite(pWal->writeLogTfd, &pWal->head, sizeof(SWalHead)) != sizeof(SWalHead)) {
//ftruncate //ftruncate
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno)); wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno));
} }
if (tfWrite(pWal->curLogTfd, &body, bodyLen) != bodyLen) { if (tfWrite(pWal->writeLogTfd, &body, bodyLen) != bodyLen) {
//ftruncate //ftruncate
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno)); wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno));
...@@ -296,6 +316,7 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i ...@@ -296,6 +316,7 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
//set status //set status
pWal->lastVersion = index; pWal->lastVersion = index;
pWal->totSize += sizeof(SWalHead) + bodyLen;
walGetCurFileInfo(pWal)->lastVer = index; walGetCurFileInfo(pWal)->lastVer = index;
walGetCurFileInfo(pWal)->fileSize += sizeof(SWalHead) + bodyLen; walGetCurFileInfo(pWal)->fileSize += sizeof(SWalHead) + bodyLen;
...@@ -305,11 +326,11 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i ...@@ -305,11 +326,11 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
} }
void walFsync(SWal *pWal, bool forceFsync) { void walFsync(SWal *pWal, bool forceFsync) {
if (pWal == NULL || !tfValid(pWal->curLogTfd)) return; if (pWal == NULL || !tfValid(pWal->writeLogTfd)) return;
if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) { if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) {
wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->vgId, walGetCurFileFirstVer(pWal)); wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->vgId, walGetCurFileFirstVer(pWal));
if (tfFsync(pWal->curLogTfd) < 0) { if (tfFsync(pWal->writeLogTfd) < 0) {
wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->vgId, walGetCurFileFirstVer(pWal), strerror(errno)); wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->vgId, walGetCurFileFirstVer(pWal), strerror(errno));
} }
} }
...@@ -408,10 +429,10 @@ static int64_t walGetOffset(SWal* pWal, int64_t ver) { ...@@ -408,10 +429,10 @@ static int64_t walGetOffset(SWal* pWal, int64_t ver) {
} }
static void walFtruncate(SWal *pWal, int64_t ver) { static void walFtruncate(SWal *pWal, int64_t ver) {
int64_t tfd = pWal->curLogTfd; int64_t tfd = pWal->writeLogTfd;
tfFtruncate(tfd, ver); tfFtruncate(tfd, ver);
tfFsync(tfd); tfFsync(tfd);
tfd = pWal->curIdxTfd; tfd = pWal->writeIdxTfd;
tfFtruncate(tfd, ver * WAL_IDX_ENTRY_SIZE); tfFtruncate(tfd, ver * WAL_IDX_ENTRY_SIZE);
tfFsync(tfd); tfFsync(tfd);
} }
......
...@@ -3,7 +3,6 @@ ...@@ -3,7 +3,6 @@
#include <iostream> #include <iostream>
#include <queue> #include <queue>
#include "tfile.h"
#include "walInt.h" #include "walInt.h"
class WalCleanEnv : public ::testing::Test { class WalCleanEnv : public ::testing::Test {
...@@ -11,13 +10,10 @@ class WalCleanEnv : public ::testing::Test { ...@@ -11,13 +10,10 @@ class WalCleanEnv : public ::testing::Test {
static void SetUpTestCase() { static void SetUpTestCase() {
int code = walInit(); int code = walInit();
ASSERT(code == 0); ASSERT(code == 0);
code = tfInit();
ASSERT(code == 0);
} }
static void TearDownTestCase() { static void TearDownTestCase() {
walCleanUp(); walCleanUp();
tfCleanup();
} }
void SetUp() override { void SetUp() override {
...@@ -45,13 +41,10 @@ class WalKeepEnv : public ::testing::Test { ...@@ -45,13 +41,10 @@ class WalKeepEnv : public ::testing::Test {
static void SetUpTestCase() { static void SetUpTestCase() {
int code = walInit(); int code = walInit();
ASSERT(code == 0); ASSERT(code == 0);
code = tfInit();
ASSERT(code == 0);
} }
static void TearDownTestCase() { static void TearDownTestCase() {
walCleanUp(); walCleanUp();
tfCleanup();
} }
void SetUp() override { void SetUp() override {
......
...@@ -237,6 +237,16 @@ void taosArraySet(SArray* pArray, size_t index, void* pData) { ...@@ -237,6 +237,16 @@ void taosArraySet(SArray* pArray, size_t index, void* pData) {
memcpy(TARRAY_GET_ELEM(pArray, index), pData, pArray->elemSize); memcpy(TARRAY_GET_ELEM(pArray, index), pData, pArray->elemSize);
} }
void taosArrayPopFrontBatch(SArray* pArray, size_t cnt) {
assert(cnt <= pArray->size);
pArray->size = pArray->size - cnt;
if(pArray->size == 0) {
pArray->size = 0;
return;
}
memmove(pArray->pData, (char*)pArray->pData + cnt * pArray->elemSize, pArray->size);
}
void taosArrayRemove(SArray* pArray, size_t index) { void taosArrayRemove(SArray* pArray, size_t index) {
assert(index < pArray->size); assert(index < pArray->size);
......
...@@ -22,20 +22,26 @@ ...@@ -22,20 +22,26 @@
static int32_t tsFileRsetId = -1; static int32_t tsFileRsetId = -1;
static int8_t tfInited = 0;
static void tfCloseFile(void *p) { static void tfCloseFile(void *p) {
taosCloseFile((int32_t)(uintptr_t)p); taosCloseFile((int32_t)(uintptr_t)p);
} }
int32_t tfInit() { int32_t tfInit() {
int8_t old = atomic_val_compare_exchange_8(&tfInited, 0, 1);
if(old == 1) return 0;
tsFileRsetId = taosOpenRef(2000, tfCloseFile); tsFileRsetId = taosOpenRef(2000, tfCloseFile);
if (tsFileRsetId > 0) { if (tsFileRsetId > 0) {
return 0; return 0;
} else { } else {
atomic_store_8(&tfInited, 0);
return -1; return -1;
} }
} }
void tfCleanup() { void tfCleanup() {
atomic_store_8(&tfInited, 0);
if (tsFileRsetId >= 0) taosCloseRef(tsFileRsetId); if (tsFileRsetId >= 0) taosCloseRef(tsFileRsetId);
tsFileRsetId = -1; tsFileRsetId = -1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册