未验证 提交 45059915 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18097 from taosdata/fix/TD-19579

fix: flush pages to get buffer ready for fetching
...@@ -169,7 +169,7 @@ int tdbPCacheAlter(SPCache *pCache, int32_t nPage) { ...@@ -169,7 +169,7 @@ int tdbPCacheAlter(SPCache *pCache, int32_t nPage) {
SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) { SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) {
SPage *pPage; SPage *pPage;
i32 nRef; i32 nRef = 0;
tdbPCacheLock(pCache); tdbPCacheLock(pCache);
...@@ -178,14 +178,17 @@ SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) { ...@@ -178,14 +178,17 @@ SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) {
nRef = tdbRefPage(pPage); nRef = tdbRefPage(pPage);
} }
ASSERT(pPage);
tdbPCacheUnlock(pCache); tdbPCacheUnlock(pCache);
// printf("thread %" PRId64 " fetch page %d pgno %d pPage %p nRef %d\n", taosGetSelfPthreadId(), pPage->id, // printf("thread %" PRId64 " fetch page %d pgno %d pPage %p nRef %d\n", taosGetSelfPthreadId(), pPage->id,
// TDB_PAGE_PGNO(pPage), pPage, nRef); // TDB_PAGE_PGNO(pPage), pPage, nRef);
tdbDebug("pcache/fetch page %p/%d/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id, nRef); if (pPage) {
tdbDebug("pcache/fetch page %p/%d/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id, nRef);
} else {
tdbDebug("pcache/fetch page %p", pPage);
}
return pPage; return pPage;
} }
...@@ -266,7 +269,7 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) ...@@ -266,7 +269,7 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn)
} }
// 4. Try a create new page // 4. Try a create new page
if (!pPage) { if (!pPage && pTxn->xMalloc != NULL) {
ret = tdbPageCreate(pCache->szPage, &pPage, pTxn->xMalloc, pTxn->xArg); ret = tdbPageCreate(pCache->szPage, &pPage, pTxn->xMalloc, pTxn->xArg);
if (ret < 0 || pPage == NULL) { if (ret < 0 || pPage == NULL) {
// TODO // TODO
......
...@@ -27,6 +27,116 @@ typedef struct { ...@@ -27,6 +27,116 @@ typedef struct {
TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct"); TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct");
struct hashset_st {
size_t nbits;
size_t mask;
size_t capacity;
size_t *items;
size_t nitems;
double load_factor;
};
static const unsigned int prime = 39;
static const unsigned int prime2 = 5009;
hashset_t hashset_create(void) {
hashset_t set = tdbOsCalloc(1, sizeof(struct hashset_st));
if (!set) {
return NULL;
}
set->nbits = 4;
set->capacity = (size_t)(1 << set->nbits);
set->items = tdbOsCalloc(set->capacity, sizeof(size_t));
if (!set->items) {
tdbOsFree(set);
return NULL;
}
set->mask = set->capacity - 1;
set->nitems = 0;
set->load_factor = 0.75;
return set;
}
void hashset_destroy(hashset_t set) {
if (set) {
tdbOsFree(set->items);
tdbOsFree(set);
}
}
int hashset_add_member(hashset_t set, void *item) {
size_t value = (size_t) item;
size_t h;
if (value == 0) {
return -1;
}
for (h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) {
if (set->items[h] == value) {
return 0;
}
}
set->items[h] = value;
++set->nitems;
return 1;
}
int hashset_add(hashset_t set, void *item) {
int ret = hashset_add_member(set, item);
size_t old_capacity = set->capacity;
if (set->nitems >= (double)old_capacity * set->load_factor) {
size_t *old_items = set->items;
++set->nbits;
set->capacity = (size_t)(1 << set->nbits);
set->mask = set->capacity - 1;
set->items = tdbOsCalloc(set->capacity, sizeof(size_t));
if (!set->items) {
return -1;
}
set->nitems = 0;
for (size_t i = 0; i < old_capacity; ++i) {
hashset_add_member(set, (void*)old_items[i]);
}
tdbOsFree(old_items);
}
return ret;
}
int hashset_remove(hashset_t set, void *item) {
size_t value = (size_t) item;
for (size_t h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) {
if (set->items[h] == value) {
set->items[h] = 0;
--set->nitems;
return 1;
}
}
return 0;
}
int hashset_contains(hashset_t set, void *item) {
size_t value = (size_t) item;
for (size_t h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) {
if (set->items[h] == value) {
return 1;
}
}
return 0;
}
#define TDB_PAGE_INITIALIZED(pPage) ((pPage)->pPager != NULL) #define TDB_PAGE_INITIALIZED(pPage) ((pPage)->pPager != NULL)
static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *, int), void *arg, static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *, int), void *arg,
...@@ -209,12 +319,16 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) { ...@@ -209,12 +319,16 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) {
tRBTreePut(&pPager->rbt, (SRBTreeNode *)pPage); tRBTreePut(&pPager->rbt, (SRBTreeNode *)pPage);
// Write page to journal if neccessary // Write page to journal if neccessary
if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize) { if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize && (pPager->jPageSet == NULL || !hashset_contains(pPager->jPageSet, (void*)((long)TDB_PAGE_PGNO(pPage))))) {
ret = tdbPagerWritePageToJournal(pPager, pPage); ret = tdbPagerWritePageToJournal(pPager, pPage);
if (ret < 0) { if (ret < 0) {
tdbError("failed to write page to journal since %s", tstrerror(terrno)); tdbError("failed to write page to journal since %s", tstrerror(terrno));
return -1; return -1;
} }
if (pPager->jPageSet) {
hashset_add(pPager->jPageSet, (void*)((long)TDB_PAGE_PGNO(pPage)));
}
} }
return 0; return 0;
...@@ -233,6 +347,7 @@ int tdbPagerBegin(SPager *pPager, TXN *pTxn) { ...@@ -233,6 +347,7 @@ int tdbPagerBegin(SPager *pPager, TXN *pTxn) {
return -1; return -1;
} }
pPager->jPageSet = hashset_create();
// TODO: write the size of the file // TODO: write the size of the file
pPager->inTran = 1; pPager->inTran = 1;
...@@ -275,6 +390,9 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { ...@@ -275,6 +390,9 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
pPage->isDirty = 0; pPage->isDirty = 0;
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
if (pPager->jPageSet) {
hashset_remove(pPager->jPageSet, (void*)((long)TDB_PAGE_PGNO(pPage)));
}
tdbPCacheRelease(pPager->pCache, pPage, pTxn); tdbPCacheRelease(pPager->pCache, pPage, pTxn);
} }
...@@ -304,6 +422,9 @@ int tdbPagerPostCommit(SPager *pPager, TXN *pTxn) { ...@@ -304,6 +422,9 @@ int tdbPagerPostCommit(SPager *pPager, TXN *pTxn) {
return -1; return -1;
} }
if (pPager->jPageSet) {
hashset_destroy(pPager->jPageSet);
}
pPager->inTran = 0; pPager->inTran = 0;
return 0; return 0;
...@@ -375,36 +496,61 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) { ...@@ -375,36 +496,61 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
return -1; return -1;
} }
tdb_fd_t jfd = tdbOsOpen(pPager->jFileName, TDB_O_RDWR, 0755); tdb_fd_t jfd = pPager->jfd;
if (jfd == NULL) {
return -1;
}
ret = tdbGetFileSize(jfd, pPager->pageSize, &journalSize); ret = tdbGetFileSize(jfd, pPager->pageSize, &journalSize);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
// 1, read pages from jounal file u8 *pageBuf = tdbOsCalloc(1, pPager->pageSize);
// 2, write original pages to buffered ones if (pageBuf == NULL) {
return -1;
}
/* TODO: reset the buffered pages instead of releasing them for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) {
// loop to reset the dirty pages from file
for (pgIdx = 0, pPage = pPager->pDirty; pPage != NULL && pgIndex < journalSize; pPage = pPage->pDirtyNext, ++pgIdx) {
// read pgno & the page from journal // read pgno & the page from journal
SPgno pgno; SPgno pgno;
int ret = tdbOsRead(jfd, &pgno, sizeof(pgno)); int ret = tdbOsRead(jfd, &pgno, sizeof(pgno));
if (ret < 0) { if (ret < 0) {
tdbOsFree(pageBuf);
return -1; return -1;
} }
ret = tdbOsRead(jfd, pageBuf, pPager->pageSize); ret = tdbOsRead(jfd, pageBuf, pPager->pageSize);
if (ret < 0) { if (ret < 0) {
tdbOsFree(pageBuf);
return -1;
}
i64 offset = pPager->pageSize * (pgno - 1);
if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
tdbError("failed to lseek fd due to %s. file:%s, offset:%" PRId64, strerror(errno), pPager->dbFileName, offset);
terrno = TAOS_SYSTEM_ERROR(errno);
tdbOsFree(pageBuf);
return -1;
}
ret = tdbOsWrite(pPager->fd, pageBuf, pPager->pageSize);
if (ret < 0) {
tdbError("failed to write buf due to %s. file: %s, bufsize:%d", strerror(errno), pPager->dbFileName,
pPager->pageSize);
terrno = TAOS_SYSTEM_ERROR(errno);
tdbOsFree(pageBuf);
return -1; return -1;
} }
} }
*/
if (tdbOsFSync(pPager->fd) < 0) {
tdbError("failed to fsync fd due to %s. dbfile:%s", strerror(errno), pPager->dbFileName);
terrno = TAOS_SYSTEM_ERROR(errno);
tdbOsFree(pageBuf);
return -1;
}
tdbOsFree(pageBuf);
// 3, release the dirty pages // 3, release the dirty pages
SRBTreeIter iter = tRBTreeIterCreate(&pPager->rbt, 1); SRBTreeIter iter = tRBTreeIterCreate(&pPager->rbt, 1);
SRBTreeNode *pNode = NULL; SRBTreeNode *pNode = NULL;
...@@ -414,6 +560,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) { ...@@ -414,6 +560,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
pPage->isDirty = 0; pPage->isDirty = 0;
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
hashset_remove(pPager->jPageSet, (void*)((long)TDB_PAGE_PGNO(pPage)));
tdbPCacheRelease(pPager->pCache, pPage, pTxn); tdbPCacheRelease(pPager->pCache, pPage, pTxn);
} }
...@@ -422,11 +569,48 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) { ...@@ -422,11 +569,48 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
// 4, remove the journal file // 4, remove the journal file
tdbOsClose(pPager->jfd); tdbOsClose(pPager->jfd);
(void)tdbOsRemove(pPager->jFileName); (void)tdbOsRemove(pPager->jFileName);
hashset_destroy(pPager->jPageSet);
pPager->inTran = 0; pPager->inTran = 0;
return 0; return 0;
} }
int tdbPagerFlushPage(SPager *pPager, TXN *pTxn) {
SPage *pPage;
int ret;
// loop to write the dirty pages to file
SRBTreeIter iter = tRBTreeIterCreate(&pPager->rbt, 1);
SRBTreeNode *pNode = NULL;
while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
pPage = (SPage *)pNode;
ret = tdbPagerWritePageToDB(pPager, pPage);
if (ret < 0) {
tdbError("failed to write page to db since %s", tstrerror(terrno));
return -1;
}
}
tdbTrace("tdbttl commit:%p, %d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize);
pPager->dbOrigSize = pPager->dbFileSize;
// release the page
iter = tRBTreeIterCreate(&pPager->rbt, 1);
while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
pPage = (SPage *)pNode;
pPage->isDirty = 0;
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
}
tRBTreeCreate(&pPager->rbt, pageCmpFn);
return 0;
}
int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg, int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg,
TXN *pTxn) { TXN *pTxn) {
SPage *pPage; SPage *pPage;
...@@ -453,10 +637,8 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa ...@@ -453,10 +637,8 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa
// fetch a page container // fetch a page container
memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN); memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN);
pgid.pgno = pgno; pgid.pgno = pgno;
pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn); while ((pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn)) == NULL) {
if (pPage == NULL) { tdbPagerFlushPage(pPager, pTxn);
ASSERT(0);
return -1;
} }
tdbTrace("tdbttl fetch pager:%p", pPage->pPager); tdbTrace("tdbttl fetch pager:%p", pPage->pPager);
......
...@@ -384,6 +384,8 @@ struct STDB { ...@@ -384,6 +384,8 @@ struct STDB {
#endif #endif
}; };
typedef struct hashset_st *hashset_t;
struct SPager { struct SPager {
char *dbFileName; char *dbFileName;
char *jFileName; char *jFileName;
...@@ -394,7 +396,8 @@ struct SPager { ...@@ -394,7 +396,8 @@ struct SPager {
SPCache *pCache; SPCache *pCache;
SPgno dbFileSize; SPgno dbFileSize;
SPgno dbOrigSize; SPgno dbOrigSize;
SPage *pDirty; //SPage *pDirty;
hashset_t jPageSet;
SRBTree rbt; SRBTree rbt;
u8 inTran; u8 inTran;
SPager *pNext; // used by TDB SPager *pNext; // used by TDB
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册