diff --git a/source/libs/tdb/src/db/tdbPCache.c b/source/libs/tdb/src/db/tdbPCache.c index bdbd6c2f3dbb904eae7b91c6cb935caa9b20aa2c..e7254c8bc6933b2c10878e1a9046de1e9f0889e6 100644 --- a/source/libs/tdb/src/db/tdbPCache.c +++ b/source/libs/tdb/src/db/tdbPCache.c @@ -169,7 +169,7 @@ int tdbPCacheAlter(SPCache *pCache, int32_t nPage) { SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) { SPage *pPage; - i32 nRef; + i32 nRef = 0; tdbPCacheLock(pCache); @@ -178,14 +178,17 @@ SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) { nRef = tdbRefPage(pPage); } - ASSERT(pPage); - tdbPCacheUnlock(pCache); // printf("thread %" PRId64 " fetch page %d pgno %d pPage %p nRef %d\n", taosGetSelfPthreadId(), pPage->id, // TDB_PAGE_PGNO(pPage), pPage, nRef); - tdbDebug("pcache/fetch page %p/%d/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id, nRef); + if (pPage) { + tdbDebug("pcache/fetch page %p/%d/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id, nRef); + } else { + tdbDebug("pcache/fetch page %p", pPage); + } + return pPage; } @@ -266,7 +269,7 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) } // 4. Try a create new page - if (!pPage) { + if (!pPage && pTxn->xMalloc != NULL) { ret = tdbPageCreate(pCache->szPage, &pPage, pTxn->xMalloc, pTxn->xArg); if (ret < 0 || pPage == NULL) { // TODO diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index c3ae1dc7395cbef32b153741b7548d365203eb4e..abbad06515b75b496de28c2b82c4bdf9cca2b53f 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -27,6 +27,116 @@ typedef struct { TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct"); +struct hashset_st { + size_t nbits; + size_t mask; + size_t capacity; + size_t *items; + size_t nitems; + double load_factor; +}; + +static const unsigned int prime = 39; +static const unsigned int prime2 = 5009; + +hashset_t hashset_create(void) { + hashset_t set = tdbOsCalloc(1, sizeof(struct hashset_st)); + if (!set) { + return NULL; + } + + set->nbits = 4; + set->capacity = (size_t)(1 << set->nbits); + set->items = tdbOsCalloc(set->capacity, sizeof(size_t)); + if (!set->items) { + tdbOsFree(set); + return NULL; + } + set->mask = set->capacity - 1; + set->nitems = 0; + + set->load_factor = 0.75; + + return set; +} + +void hashset_destroy(hashset_t set) { + if (set) { + tdbOsFree(set->items); + tdbOsFree(set); + } +} + +int hashset_add_member(hashset_t set, void *item) { + size_t value = (size_t) item; + size_t h; + + if (value == 0) { + return -1; + } + + for (h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) { + if (set->items[h] == value) { + return 0; + } + } + + set->items[h] = value; + ++set->nitems; + return 1; +} + +int hashset_add(hashset_t set, void *item) { + int ret = hashset_add_member(set, item); + + size_t old_capacity = set->capacity; + if (set->nitems >= (double)old_capacity * set->load_factor) { + size_t *old_items = set->items; + ++set->nbits; + set->capacity = (size_t)(1 << set->nbits); + set->mask = set->capacity - 1; + + set->items = tdbOsCalloc(set->capacity, sizeof(size_t)); + if (!set->items) { + return -1; + } + + set->nitems = 0; + for (size_t i = 0; i < old_capacity; ++i) { + hashset_add_member(set, (void*)old_items[i]); + } + tdbOsFree(old_items); + } + + return ret; +} + +int hashset_remove(hashset_t set, void *item) { + size_t value = (size_t) item; + + for (size_t h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) { + if (set->items[h] == value) { + set->items[h] = 0; + --set->nitems; + return 1; + } + } + + return 0; +} + +int hashset_contains(hashset_t set, void *item) { + size_t value = (size_t) item; + + for (size_t h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) { + if (set->items[h] == value) { + return 1; + } + } + + return 0; +} + #define TDB_PAGE_INITIALIZED(pPage) ((pPage)->pPager != NULL) static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *, int), void *arg, @@ -209,12 +319,16 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) { tRBTreePut(&pPager->rbt, (SRBTreeNode *)pPage); // Write page to journal if neccessary - if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize) { + if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize && (pPager->jPageSet == NULL || !hashset_contains(pPager->jPageSet, (void*)((long)TDB_PAGE_PGNO(pPage))))) { ret = tdbPagerWritePageToJournal(pPager, pPage); if (ret < 0) { tdbError("failed to write page to journal since %s", tstrerror(terrno)); return -1; } + + if (pPager->jPageSet) { + hashset_add(pPager->jPageSet, (void*)((long)TDB_PAGE_PGNO(pPage))); + } } return 0; @@ -233,6 +347,7 @@ int tdbPagerBegin(SPager *pPager, TXN *pTxn) { return -1; } + pPager->jPageSet = hashset_create(); // TODO: write the size of the file pPager->inTran = 1; @@ -275,6 +390,9 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { pPage->isDirty = 0; tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); + if (pPager->jPageSet) { + hashset_remove(pPager->jPageSet, (void*)((long)TDB_PAGE_PGNO(pPage))); + } tdbPCacheRelease(pPager->pCache, pPage, pTxn); } @@ -304,6 +422,9 @@ int tdbPagerPostCommit(SPager *pPager, TXN *pTxn) { return -1; } + if (pPager->jPageSet) { + hashset_destroy(pPager->jPageSet); + } pPager->inTran = 0; return 0; @@ -375,36 +496,61 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) { return -1; } - tdb_fd_t jfd = tdbOsOpen(pPager->jFileName, TDB_O_RDWR, 0755); - if (jfd == NULL) { - return -1; - } + tdb_fd_t jfd = pPager->jfd; ret = tdbGetFileSize(jfd, pPager->pageSize, &journalSize); if (ret < 0) { return -1; } - // 1, read pages from jounal file - // 2, write original pages to buffered ones + u8 *pageBuf = tdbOsCalloc(1, pPager->pageSize); + if (pageBuf == NULL) { + return -1; + } - /* TODO: reset the buffered pages instead of releasing them - // loop to reset the dirty pages from file - for (pgIdx = 0, pPage = pPager->pDirty; pPage != NULL && pgIndex < journalSize; pPage = pPage->pDirtyNext, ++pgIdx) { + for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) { // read pgno & the page from journal SPgno pgno; int ret = tdbOsRead(jfd, &pgno, sizeof(pgno)); if (ret < 0) { + tdbOsFree(pageBuf); return -1; } ret = tdbOsRead(jfd, pageBuf, pPager->pageSize); if (ret < 0) { + tdbOsFree(pageBuf); + return -1; + } + + i64 offset = pPager->pageSize * (pgno - 1); + if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) { + tdbError("failed to lseek fd due to %s. file:%s, offset:%" PRId64, strerror(errno), pPager->dbFileName, offset); + terrno = TAOS_SYSTEM_ERROR(errno); + tdbOsFree(pageBuf); + return -1; + } + + ret = tdbOsWrite(pPager->fd, pageBuf, pPager->pageSize); + if (ret < 0) { + tdbError("failed to write buf due to %s. file: %s, bufsize:%d", strerror(errno), pPager->dbFileName, + pPager->pageSize); + terrno = TAOS_SYSTEM_ERROR(errno); + tdbOsFree(pageBuf); return -1; } } - */ + + if (tdbOsFSync(pPager->fd) < 0) { + tdbError("failed to fsync fd due to %s. dbfile:%s", strerror(errno), pPager->dbFileName); + terrno = TAOS_SYSTEM_ERROR(errno); + tdbOsFree(pageBuf); + return -1; + } + + tdbOsFree(pageBuf); + // 3, release the dirty pages SRBTreeIter iter = tRBTreeIterCreate(&pPager->rbt, 1); SRBTreeNode *pNode = NULL; @@ -414,6 +560,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) { pPage->isDirty = 0; tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); + hashset_remove(pPager->jPageSet, (void*)((long)TDB_PAGE_PGNO(pPage))); tdbPCacheRelease(pPager->pCache, pPage, pTxn); } @@ -422,11 +569,48 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) { // 4, remove the journal file tdbOsClose(pPager->jfd); (void)tdbOsRemove(pPager->jFileName); + hashset_destroy(pPager->jPageSet); + pPager->inTran = 0; return 0; } +int tdbPagerFlushPage(SPager *pPager, TXN *pTxn) { + SPage *pPage; + int ret; + + // loop to write the dirty pages to file + SRBTreeIter iter = tRBTreeIterCreate(&pPager->rbt, 1); + SRBTreeNode *pNode = NULL; + while ((pNode = tRBTreeIterNext(&iter)) != NULL) { + pPage = (SPage *)pNode; + ret = tdbPagerWritePageToDB(pPager, pPage); + if (ret < 0) { + tdbError("failed to write page to db since %s", tstrerror(terrno)); + return -1; + } + } + + tdbTrace("tdbttl commit:%p, %d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize); + pPager->dbOrigSize = pPager->dbFileSize; + + // release the page + iter = tRBTreeIterCreate(&pPager->rbt, 1); + while ((pNode = tRBTreeIterNext(&iter)) != NULL) { + pPage = (SPage *)pNode; + + pPage->isDirty = 0; + + tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); + tdbPCacheRelease(pPager->pCache, pPage, pTxn); + } + + tRBTreeCreate(&pPager->rbt, pageCmpFn); + + return 0; +} + int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg, TXN *pTxn) { SPage *pPage; @@ -453,10 +637,8 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa // fetch a page container memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN); pgid.pgno = pgno; - pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn); - if (pPage == NULL) { - ASSERT(0); - return -1; + while ((pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn)) == NULL) { + tdbPagerFlushPage(pPager, pTxn); } tdbTrace("tdbttl fetch pager:%p", pPage->pPager); diff --git a/source/libs/tdb/src/inc/tdbInt.h b/source/libs/tdb/src/inc/tdbInt.h index e5ece98b28d24eca7c2f9d72f1a72888807b6bb1..731b1927e70382680119afbc64d333e4e841a8e7 100644 --- a/source/libs/tdb/src/inc/tdbInt.h +++ b/source/libs/tdb/src/inc/tdbInt.h @@ -384,6 +384,8 @@ struct STDB { #endif }; +typedef struct hashset_st *hashset_t; + struct SPager { char *dbFileName; char *jFileName; @@ -394,7 +396,8 @@ struct SPager { SPCache *pCache; SPgno dbFileSize; SPgno dbOrigSize; - SPage *pDirty; + //SPage *pDirty; + hashset_t jPageSet; SRBTree rbt; u8 inTran; SPager *pNext; // used by TDB