未验证 提交 713c20c4 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #9028 from taosdata/feature/tq

make tfinit called once
......@@ -91,7 +91,8 @@ typedef struct SWal {
int32_t fsyncPeriod; // millisecond
int32_t rollPeriod; // second
int64_t segSize;
int64_t rtSize;
int64_t retentionSize;
int32_t retentionPeriod;
EWalType level;
//total size
int64_t totSize;
......@@ -99,31 +100,24 @@ typedef struct SWal {
int32_t fsyncSeq;
//reference
int64_t refId;
//current tfd
int64_t curLogTfd;
int64_t curIdxTfd;
//write tfd
int64_t writeLogTfd;
int64_t writeIdxTfd;
//read tfd
int64_t readLogTfd;
int64_t readIdxTfd;
//current version
int64_t curVersion;
//current file version
//int64_t curFileFirstVersion;
//int64_t curFileLastVersion;
//wal lifecycle
int64_t firstVersion;
int64_t snapshotVersion;
int64_t commitVersion;
int64_t lastVersion;
//last file
//int64_t lastFileName;
//roll status
int64_t lastRollSeq;
//int64_t lastFileWriteSize;
//file set
int32_t fileCursor;
int32_t writeCur;
int32_t readCur;
SArray* fileInfoSet;
//ctl
int32_t curStatus;
......
......@@ -146,6 +146,13 @@ void* taosArrayInsert(SArray* pArray, size_t index, void* pData);
*/
void taosArraySet(SArray* pArray, size_t index, void* pData);
/**
* remove some data entry from front
* @param pArray
* @param cnt
*/
void taosArrayPopFrontBatch(SArray* pArray, size_t cnt);
/**
* remove data entry of the given index
* @param pArray
......
......@@ -49,26 +49,26 @@ static inline int64_t walGetLastFileFirstVer(SWal* pWal) {
}
static inline int64_t walGetCurFileFirstVer(SWal* pWal) {
WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor);
WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur);
return pInfo->firstVer;
}
static inline int64_t walGetCurFileLastVer(SWal* pWal) {
WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor);
WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur);
return pInfo->firstVer;
}
static inline int64_t walGetCurFileOffset(SWal* pWal) {
WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor);
WalFileInfo* pInfo = (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur);
return pInfo->fileSize;
}
static inline bool walCurFileClosed(SWal* pWal) {
return taosArrayGetSize(pWal->fileInfoSet) != pWal->fileCursor;
return taosArrayGetSize(pWal->fileInfoSet) != pWal->writeCur;
}
static inline WalFileInfo* walGetCurFileInfo(SWal* pWal) {
return (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->fileCursor);
return (WalFileInfo*)taosArrayGet(pWal->fileInfoSet, pWal->writeCur);
}
static inline int walBuildLogName(SWal*pWal, int64_t fileFirstVer, char* buf) {
......
......@@ -23,25 +23,25 @@
static int walSeekFilePos(SWal* pWal, int64_t ver) {
int code = 0;
int64_t idxTfd = pWal->curIdxTfd;
int64_t logTfd = pWal->curLogTfd;
int64_t idxTfd = pWal->writeIdxTfd;
int64_t logTfd = pWal->writeLogTfd;
//seek position
int64_t offset = (ver - walGetCurFileFirstVer(pWal)) * WAL_IDX_ENTRY_SIZE;
code = tfLseek(idxTfd, offset, SEEK_SET);
if(code != 0) {
return -1;
}
int64_t readBuf[2];
code = tfRead(idxTfd, readBuf, sizeof(readBuf));
if(code != 0) {
return -1;
}
//TODO:deserialize
ASSERT(readBuf[0] == ver);
code = tfLseek(logTfd, readBuf[1], SEEK_CUR);
if (code != 0) {
return -1;
}
/*pWal->curLogOffset = readBuf[1];*/
pWal->curVersion = ver;
......@@ -52,11 +52,11 @@ static int walChangeFile(SWal *pWal, int64_t ver) {
int code = 0;
int64_t idxTfd, logTfd;
char fnameStr[WAL_FILE_LEN];
code = tfClose(pWal->curLogTfd);
code = tfClose(pWal->writeLogTfd);
if(code != 0) {
//TODO
}
code = tfClose(pWal->curIdxTfd);
code = tfClose(pWal->writeIdxTfd);
if(code != 0) {
//TODO
}
......@@ -81,14 +81,14 @@ static int walChangeFile(SWal *pWal, int64_t ver) {
logTfd = tfOpenReadWrite(fnameStr);
}
pWal->curLogTfd = logTfd;
pWal->curIdxTfd = idxTfd;
pWal->writeLogTfd = logTfd;
pWal->writeIdxTfd = idxTfd;
return code;
}
int walSeekVer(SWal *pWal, int64_t ver) {
if((!(pWal->curStatus & WAL_CUR_FAILED))
&& ver == pWal->curVersion) {
int code;
if((!(pWal->curStatus & WAL_CUR_FAILED)) && ver == pWal->curVersion) {
return 0;
}
if(ver > pWal->lastVersion) {
......@@ -103,9 +103,15 @@ int walSeekVer(SWal *pWal, int64_t ver) {
//TODO: seek snapshotted log, invalid in some cases
}
if(ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) {
walChangeFile(pWal, ver);
code = walChangeFile(pWal, ver);
if(code != 0) {
return -1;
}
}
walSeekFilePos(pWal, ver);
code = walSeekFilePos(pWal, ver);
if(code != 0) {
return -1;
}
return 0;
}
......@@ -48,9 +48,15 @@ int32_t walInit() {
int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 1);
if(old == 1) return 0;
int code = tfInit();
if(code != 0) {
wError("failed to init tfile since %s", tstrerror(code));
atomic_store_8(&tsWal.inited, 0);
return code;
}
tsWal.refSetId = taosOpenRef(TSDB_MIN_VNODES, walFreeObj);
int code = walCreateThread();
code = walCreateThread();
if (code != 0) {
wError("failed to init wal module since %s", tstrerror(code));
atomic_store_8(&tsWal.inited, 0);
......@@ -74,8 +80,8 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
pWal->curLogTfd = -1;
pWal->curIdxTfd = -1;
pWal->writeLogTfd = -1;
pWal->writeIdxTfd = -1;
//set config
pWal->vgId = pCfg->vgId;
......@@ -138,8 +144,8 @@ void walClose(SWal *pWal) {
if (pWal == NULL) return;
pthread_mutex_lock(&pWal->mutex);
tfClose(pWal->curLogTfd);
tfClose(pWal->curIdxTfd);
tfClose(pWal->writeLogTfd);
tfClose(pWal->writeIdxTfd);
/*taosArrayDestroy(pWal->fileInfoSet);*/
/*pWal->fileInfoSet = NULL;*/
pthread_mutex_unlock(&pWal->mutex);
......@@ -165,8 +171,8 @@ static void walFreeObj(void *wal) {
SWal *pWal = wal;
wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal);
tfClose(pWal->curLogTfd);
tfClose(pWal->curIdxTfd);
tfClose(pWal->writeLogTfd);
tfClose(pWal->writeIdxTfd);
taosArrayDestroy(pWal->fileInfoSet);
pWal->fileInfoSet = NULL;
taosArrayDestroy(pWal->fileInfoSet);
......@@ -197,7 +203,7 @@ static void walFsyncAll() {
while (pWal) {
if (walNeedFsync(pWal)) {
wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq));
int32_t code = tfFsync(pWal->curLogTfd);
int32_t code = tfFsync(pWal->writeLogTfd);
if (code != 0) {
wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(code));
}
......
......@@ -42,7 +42,7 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
}
*ppHead = ptr;
}
if(tfRead(pWal->curLogTfd, *ppHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
if(tfRead(pWal->writeLogTfd, *ppHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
return -1;
}
//TODO: endian compatibility processing after read
......@@ -55,7 +55,7 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
*ppHead = NULL;
return -1;
}
if(tfRead(pWal->curLogTfd, (*ppHead)->cont, (*ppHead)->len) != (*ppHead)->len) {
if(tfRead(pWal->writeLogTfd, (*ppHead)->cont, (*ppHead)->len) != (*ppHead)->len) {
return -1;
}
//TODO: endian compatibility processing after read
......
......@@ -44,18 +44,39 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
int32_t walTakeSnapshot(SWal *pWal, int64_t ver) {
pWal->snapshotVersion = ver;
int ts = taosGetTimestampSec();
int deleteCnt = 0;
int64_t newTotSize = pWal->totSize;
WalFileInfo tmp;
tmp.firstVer = ver;
//mark files safe to delete
WalFileInfo* pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE);
//iterate files, until the searched result
//if totSize > rtSize, delete
//if createTs > retentionTs, delete
for(WalFileInfo* iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
if(pWal->totSize > pWal->retentionSize ||
iter->closeTs + pWal->retentionPeriod > ts) {
//delete according to file size or close time
deleteCnt++;
newTotSize -= iter->fileSize;
}
}
char fnameStr[WAL_FILE_LEN];
//remove file
for(int i = 0; i < deleteCnt; i++) {
WalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, i);
walBuildLogName(pWal, pInfo->firstVer, fnameStr);
remove(fnameStr);
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
remove(fnameStr);
}
//save snapshot ver, commit ver
//make new array, remove files
taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);
pWal->totSize = newTotSize;
return 0;
}
......@@ -153,14 +174,14 @@ void walRemoveAllOldFiles(void *handle) {
int walRoll(SWal *pWal) {
int code = 0;
if(pWal->curIdxTfd != -1) {
code = tfClose(pWal->curIdxTfd);
if(pWal->writeIdxTfd != -1) {
code = tfClose(pWal->writeIdxTfd);
if(code != 0) {
return -1;
}
}
if(pWal->curLogTfd != -1) {
code = tfClose(pWal->curLogTfd);
if(pWal->writeLogTfd != -1) {
code = tfClose(pWal->writeLogTfd);
if(code != 0) {
return -1;
}
......@@ -188,8 +209,8 @@ int walRoll(SWal *pWal) {
}
//switch file
pWal->curIdxTfd = idxTfd;
pWal->curLogTfd = logTfd;
pWal->writeIdxTfd = idxTfd;
pWal->writeLogTfd = logTfd;
//change status
pWal->curStatus = WAL_CUR_FILE_WRITABLE & WAL_CUR_POS_WRITABLE;
......@@ -215,8 +236,8 @@ int walChangeFileToLast(SWal *pWal) {
return -1;
}
//switch file
pWal->curIdxTfd = idxTfd;
pWal->curLogTfd = logTfd;
pWal->writeIdxTfd = idxTfd;
pWal->writeLogTfd = logTfd;
//change status
pWal->curVersion = fileFirstVer;
pWal->curStatus = WAL_CUR_FILE_WRITABLE;
......@@ -226,15 +247,14 @@ int walChangeFileToLast(SWal *pWal) {
static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
int code = 0;
//get index file
if(!tfValid(pWal->curIdxTfd)) {
if(!tfValid(pWal->writeIdxTfd)) {
code = TAOS_SYSTEM_ERROR(errno);
WalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, pWal->fileCursor);
wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, pInfo->firstVer, strerror(errno));
wError("vgId:%d, file:%"PRId64".idx, failed to open since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno));
return code;
}
int64_t writeBuf[2] = { ver, offset };
int size = tfWrite(pWal->curIdxTfd, writeBuf, sizeof(writeBuf));
int size = tfWrite(pWal->writeIdxTfd, writeBuf, sizeof(writeBuf));
if(size != sizeof(writeBuf)) {
return -1;
}
......@@ -278,13 +298,13 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
pthread_mutex_lock(&pWal->mutex);
if (tfWrite(pWal->curLogTfd, &pWal->head, sizeof(SWalHead)) != sizeof(SWalHead)) {
if (tfWrite(pWal->writeLogTfd, &pWal->head, sizeof(SWalHead)) != sizeof(SWalHead)) {
//ftruncate
code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno));
}
if (tfWrite(pWal->curLogTfd, &body, bodyLen) != bodyLen) {
if (tfWrite(pWal->writeLogTfd, &body, bodyLen) != bodyLen) {
//ftruncate
code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->vgId, walGetLastFileFirstVer(pWal), strerror(errno));
......@@ -296,6 +316,7 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
//set status
pWal->lastVersion = index;
pWal->totSize += sizeof(SWalHead) + bodyLen;
walGetCurFileInfo(pWal)->lastVer = index;
walGetCurFileInfo(pWal)->fileSize += sizeof(SWalHead) + bodyLen;
......@@ -305,11 +326,11 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
}
void walFsync(SWal *pWal, bool forceFsync) {
if (pWal == NULL || !tfValid(pWal->curLogTfd)) return;
if (pWal == NULL || !tfValid(pWal->writeLogTfd)) return;
if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) {
wTrace("vgId:%d, fileId:%"PRId64".log, do fsync", pWal->vgId, walGetCurFileFirstVer(pWal));
if (tfFsync(pWal->curLogTfd) < 0) {
if (tfFsync(pWal->writeLogTfd) < 0) {
wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->vgId, walGetCurFileFirstVer(pWal), strerror(errno));
}
}
......@@ -408,10 +429,10 @@ static int64_t walGetOffset(SWal* pWal, int64_t ver) {
}
static void walFtruncate(SWal *pWal, int64_t ver) {
int64_t tfd = pWal->curLogTfd;
int64_t tfd = pWal->writeLogTfd;
tfFtruncate(tfd, ver);
tfFsync(tfd);
tfd = pWal->curIdxTfd;
tfd = pWal->writeIdxTfd;
tfFtruncate(tfd, ver * WAL_IDX_ENTRY_SIZE);
tfFsync(tfd);
}
......
......@@ -3,7 +3,6 @@
#include <iostream>
#include <queue>
#include "tfile.h"
#include "walInt.h"
class WalCleanEnv : public ::testing::Test {
......@@ -11,13 +10,10 @@ class WalCleanEnv : public ::testing::Test {
static void SetUpTestCase() {
int code = walInit();
ASSERT(code == 0);
code = tfInit();
ASSERT(code == 0);
}
static void TearDownTestCase() {
walCleanUp();
tfCleanup();
}
void SetUp() override {
......@@ -45,13 +41,10 @@ class WalKeepEnv : public ::testing::Test {
static void SetUpTestCase() {
int code = walInit();
ASSERT(code == 0);
code = tfInit();
ASSERT(code == 0);
}
static void TearDownTestCase() {
walCleanUp();
tfCleanup();
}
void SetUp() override {
......
......@@ -237,6 +237,16 @@ void taosArraySet(SArray* pArray, size_t index, void* pData) {
memcpy(TARRAY_GET_ELEM(pArray, index), pData, pArray->elemSize);
}
void taosArrayPopFrontBatch(SArray* pArray, size_t cnt) {
assert(cnt <= pArray->size);
pArray->size = pArray->size - cnt;
if(pArray->size == 0) {
pArray->size = 0;
return;
}
memmove(pArray->pData, (char*)pArray->pData + cnt * pArray->elemSize, pArray->size);
}
void taosArrayRemove(SArray* pArray, size_t index) {
assert(index < pArray->size);
......
......@@ -22,20 +22,26 @@
static int32_t tsFileRsetId = -1;
static int8_t tfInited = 0;
static void tfCloseFile(void *p) {
taosCloseFile((int32_t)(uintptr_t)p);
}
int32_t tfInit() {
int8_t old = atomic_val_compare_exchange_8(&tfInited, 0, 1);
if(old == 1) return 0;
tsFileRsetId = taosOpenRef(2000, tfCloseFile);
if (tsFileRsetId > 0) {
return 0;
} else {
atomic_store_8(&tfInited, 0);
return -1;
}
}
void tfCleanup() {
atomic_store_8(&tfInited, 0);
if (tsFileRsetId >= 0) taosCloseRef(tsFileRsetId);
tsFileRsetId = -1;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册