未验证 提交 7ce98762 编写于 作者: H Hongze Cheng 提交者: GitHub

Merge pull request #16782 from taosdata/fix/TD-19055

fix(tdb): use rbtree with dirty pages' list
......@@ -34,6 +34,22 @@ static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage
static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage);
static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage);
static FORCE_INLINE int32_t pageCmpFn(const void *lhs, const void *rhs) {
SPage *pPageL = (SPage *)(((uint8_t *)lhs) - sizeof(SRBTreeNode));
SPage *pPageR = (SPage *)(((uint8_t *)rhs) - sizeof(SRBTreeNode));
SPgno pgnoL = TDB_PAGE_PGNO(pPageL);
SPgno pgnoR = TDB_PAGE_PGNO(pPageR);
if (pgnoL < pgnoR) {
return -1;
} else if (pgnoL > pgnoR) {
return 1;
} else {
return 0;
}
}
int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) {
uint8_t *pPtr;
SPager *pPager;
......@@ -83,6 +99,8 @@ int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) {
ret = tdbGetFileSize(pPager->fd, pPager->pageSize, &(pPager->dbOrigSize));
pPager->dbFileSize = pPager->dbOrigSize;
tRBTreeCreate(&pPager->rbt, pageCmpFn);
*ppPager = pPager;
return 0;
}
......@@ -167,7 +185,7 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) {
// ref page one more time so the page will not be release
tdbRefPage(pPage);
tdbDebug("pcache/mdirty page %p/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id);
/*
// Set page as dirty
pPage->isDirty = 1;
......@@ -185,6 +203,8 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) {
ASSERT(*ppPage == NULL || TDB_PAGE_PGNO(*ppPage) > TDB_PAGE_PGNO(pPage));
pPage->pDirtyNext = *ppPage;
*ppPage = pPage;
*/
tRBTreePut(&pPager->rbt, (SRBTreeNode *)pPage);
// Write page to journal if neccessary
if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize) {
......@@ -228,6 +248,23 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
return 0;
}
SRBTreeIter iter = tRBTreeIterCreate(&pPager->rbt, 1);
SRBTreeNode *pNode = NULL;
while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
pPage = (SPage *)pNode;
ret = tdbPagerWritePageToDB(pPager, pPage);
if (ret < 0) {
ASSERT(0);
return -1;
}
pPage->isDirty = 0;
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
}
tRBTreeCreate(&pPager->rbt, pageCmpFn);
/*
// loop to write the dirty pages to file
for (pPage = pPager->pDirty; pPage; pPage = pPage->pDirtyNext) {
// TODO: update the page footer
......@@ -238,9 +275,6 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
}
}
tdbTrace("tdbttl commit:%p, %d", pPager, pPager->dbOrigSize);
pPager->dbOrigSize = pPager->dbFileSize;
// release the page
for (pPage = pPager->pDirty; pPage; pPage = pPager->pDirty) {
pPager->pDirty = pPage->pDirtyNext;
......@@ -250,6 +284,9 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
}
*/
tdbTrace("tdbttl commit:%p, %d", pPager, pPager->dbOrigSize);
pPager->dbOrigSize = pPager->dbFileSize;
// sync the db file
tdbOsFSync(pPager->fd);
......@@ -497,7 +534,14 @@ static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage) {
return 0;
}
/*
struct TdFile {
TdThreadRwlock rwlock;
int refId;
int fd;
FILE *fp;
} TdFile;
*/
static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) {
i64 offset;
int ret;
......@@ -514,6 +558,7 @@ static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) {
return -1;
}
// pwrite(pPager->fd->fd, pPage->pData, pPage->pageSize, offset);
return 0;
}
......
......@@ -19,6 +19,7 @@
#include "tdb.h"
#include "tlog.h"
#include "trbtree.h"
#ifdef __cplusplus
extern "C" {
......@@ -256,6 +257,7 @@ typedef struct {
#pragma pack(pop)
struct SPage {
SRBTreeNode node; // must be the first field for pageCmpFn to work
tdb_spinlock_t lock;
int pageSize;
u8 *pData;
......@@ -280,13 +282,13 @@ struct SPage {
static inline i32 tdbRefPage(SPage *pPage) {
i32 nRef = atomic_add_fetch_32(&((pPage)->nRef), 1);
tdbTrace("ref page %p/%d, nRef %d", pPage, pPage->id, nRef);
// tdbTrace("ref page %p/%d, nRef %d", pPage, pPage->id, nRef);
return nRef;
}
static inline i32 tdbUnrefPage(SPage *pPage) {
i32 nRef = atomic_sub_fetch_32(&((pPage)->nRef), 1);
tdbTrace("unref page %p/%d, nRef %d", pPage, pPage->id, nRef);
// tdbTrace("unref page %p/%d, nRef %d", pPage, pPage->id, nRef);
return nRef;
}
......@@ -389,6 +391,7 @@ struct SPager {
SPgno dbFileSize;
SPgno dbOrigSize;
SPage *pDirty;
SRBTree rbt;
u8 inTran;
SPager *pNext; // used by TDB
SPager *pHashNext; // used by TDB
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册