提交 e090cd9d 编写于 作者: M Minglei Jin

tdb/pager: move journal to txn

上级 a780305e
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#define _TD_TDB_H_ #define _TD_TDB_H_
#include "os.h" #include "os.h"
#include "tdbOs.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -78,12 +79,17 @@ int32_t tdbTxnClose(TXN *pTxn); ...@@ -78,12 +79,17 @@ int32_t tdbTxnClose(TXN *pTxn);
// other // other
void tdbFree(void *); void tdbFree(void *);
typedef struct hashset_st *hashset_t;
struct STxn { struct STxn {
int flags; int flags;
int64_t txnId; int64_t txnId;
void *(*xMalloc)(void *, size_t); void *(*xMalloc)(void *, size_t);
void (*xFree)(void *, void *); void (*xFree)(void *, void *);
void *xArg; void *xArg;
tdb_fd_t jfd;
hashset_t jPageSet;
int preped;
}; };
// error code // error code
......
...@@ -69,14 +69,15 @@ int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t) ...@@ -69,14 +69,15 @@ int tdbPageCreate(int pageSize, SPage **ppPage, void *(*xMalloc)(void *, size_t)
*ppPage = pPage; *ppPage = pPage;
tdbDebug("page/create: %p %p", pPage, xMalloc); tdbTrace("page/create: %p/%d %p", pPage, pPage->id, xMalloc);
return 0; return 0;
} }
int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg) { int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg) {
u8 *ptr; u8 *ptr;
tdbDebug("page/destroy: %p %p", pPage, xFree); tdbTrace("page/destroy: %p/%d %p", pPage, pPage->id, xFree);
ASSERT(!pPage->isDirty);
ASSERT(xFree); ASSERT(xFree);
for (int iOvfl = 0; iOvfl < pPage->nOverflow; iOvfl++) { for (int iOvfl = 0; iOvfl < pPage->nOverflow; iOvfl++) {
......
...@@ -219,9 +219,11 @@ int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) { ...@@ -219,9 +219,11 @@ int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) {
int tdbPagerClose(SPager *pPager) { int tdbPagerClose(SPager *pPager) {
if (pPager) { if (pPager) {
/*
if (pPager->inTran) { if (pPager->inTran) {
tdbOsClose(pPager->jfd); tdbOsClose(pPager->jfd);
} }
*/
tdbOsClose(pPager->fd); tdbOsClose(pPager->fd);
tdbOsFree(pPager); tdbOsFree(pPager);
} }
...@@ -232,16 +234,7 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) { ...@@ -232,16 +234,7 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) {
int ret; int ret;
SPage **ppPage; SPage **ppPage;
ASSERT(pPager->inTran); // ASSERT(pPager->inTran);
#if 0
if (pPager->inTran == 0) {
ret = tdbPagerBegin(pPager);
if (ret < 0) {
return -1;
}
}
#endif
if (pPage->isDirty) return 0; if (pPage->isDirty) return 0;
// ref page one more time so the page will not be release // ref page one more time so the page will not be release
...@@ -271,15 +264,16 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) { ...@@ -271,15 +264,16 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) {
// Write page to journal if neccessary // Write page to journal if neccessary
if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize && if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize &&
(pPager->jPageSet == NULL || !hashset_contains(pPager->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage))))) { (pPager->pActiveTxn->jPageSet == NULL ||
!hashset_contains(pPager->pActiveTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage))))) {
ret = tdbPagerWritePageToJournal(pPager, pPage); ret = tdbPagerWritePageToJournal(pPager, pPage);
if (ret < 0) { if (ret < 0) {
tdbError("failed to write page to journal since %s", tstrerror(terrno)); tdbError("failed to write page to journal since %s", tstrerror(terrno));
return -1; return -1;
} }
if (pPager->jPageSet) { if (pPager->pActiveTxn->jPageSet) {
hashset_add(pPager->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage))); hashset_add(pPager->pActiveTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
} }
} }
...@@ -287,23 +281,28 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) { ...@@ -287,23 +281,28 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) {
} }
int tdbPagerBegin(SPager *pPager, TXN *pTxn) { int tdbPagerBegin(SPager *pPager, TXN *pTxn) {
/*
if (pPager->inTran) { if (pPager->inTran) {
return 0; return 0;
} }
*/
// Open the journal // Open the journal
pPager->jfd = tdbOsOpen(pPager->jFileName, TDB_O_CREAT | TDB_O_RDWR, 0755); char jTxnFileName[TDB_FILENAME_LEN];
if (TDB_FD_INVALID(pPager->jfd)) { sprintf(jTxnFileName, "%s.%" PRId64, pPager->jFileName, pTxn->txnId);
pTxn->jfd = tdbOsOpen(jTxnFileName, TDB_O_CREAT | TDB_O_RDWR, 0755);
if (TDB_FD_INVALID(pTxn->jfd)) {
tdbError("failed to open file due to %s. jFileName:%s", strerror(errno), pPager->jFileName); tdbError("failed to open file due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
pPager->jPageSet = hashset_create(); pTxn->jPageSet = hashset_create();
ASSERT(pPager->pActiveTxn->preped == 1);
pPager->pActiveTxn = pTxn;
// TODO: write the size of the file // TODO: write the size of the file
/*
pPager->inTran = 1; pPager->inTran = 1;
*/
return 0; return 0;
} }
...@@ -312,9 +311,9 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { ...@@ -312,9 +311,9 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
int ret; int ret;
// sync the journal file // sync the journal file
ret = tdbOsFSync(pPager->jfd); ret = tdbOsFSync(pTxn->jfd);
if (ret < 0) { if (ret < 0) {
tdbError("failed to fsync jfd due to %s. jFileName:%s", strerror(errno), pPager->jFileName); tdbError("failed to fsync: %s. jFileName:%s, %" PRId64, strerror(errno), pPager->jFileName, pTxn->txnId);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
...@@ -344,8 +343,8 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { ...@@ -344,8 +343,8 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
pPage->isDirty = 0; pPage->isDirty = 0;
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
if (pPager->jPageSet) { if (pTxn->jPageSet) {
hashset_remove(pPager->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage))); hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
} }
tdbPCacheRelease(pPager->pCache, pPage, pTxn); tdbPCacheRelease(pPager->pCache, pPage, pTxn);
} }
...@@ -364,35 +363,39 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { ...@@ -364,35 +363,39 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
} }
int tdbPagerPostCommit(SPager *pPager, TXN *pTxn) { int tdbPagerPostCommit(SPager *pPager, TXN *pTxn) {
char jTxnFileName[TDB_FILENAME_LEN];
sprintf(jTxnFileName, "%s.%" PRId64, pPager->jFileName, pTxn->txnId);
// remove the journal file // remove the journal file
if (tdbOsClose(pPager->jfd) < 0) { if (tdbOsClose(pTxn->jfd) < 0) {
tdbError("failed to close jfd due to %s. file:%s", strerror(errno), pPager->jFileName); tdbError("failed to close jfd: %s. file:%s, %" PRId64, strerror(errno), pPager->jFileName, pTxn->txnId);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
if (tdbOsRemove(pPager->jFileName) < 0 && errno != ENOENT) { if (tdbOsRemove(jTxnFileName) < 0 && errno != ENOENT) {
tdbError("failed to remove file due to %s. file:%s", strerror(errno), pPager->jFileName); tdbError("failed to remove file due to %s. file:%s", strerror(errno), jTxnFileName);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
if (pPager->jPageSet) { if (pTxn->jPageSet) {
hashset_destroy(pPager->jPageSet); hashset_destroy(pTxn->jPageSet);
} }
pPager->inTran = 0; // pPager->inTran = 0;
return 0; return 0;
} }
int tdbPagerPrepareAsyncCommit(SPager *pPager, TXN *pTxn) { int tdbPagerPrepareAsyncCommit(SPager *pPager, TXN *pTxn) {
SPage *pPage; SPage *pPage;
SPgno maxPgno = pPager->dbOrigSize;
int ret; int ret;
// sync the journal file // sync the journal file
ret = tdbOsFSync(pPager->jfd); ret = tdbOsFSync(pTxn->jfd);
if (ret < 0) { if (ret < 0) {
tdbError("failed to fsync jfd due to %s. jFileName:%s", strerror(errno), pPager->jFileName); tdbError("failed to fsync jfd: %s. jfile:%s, %" PRId64, strerror(errno), pPager->jFileName, pTxn->txnId);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
...@@ -403,6 +406,11 @@ int tdbPagerPrepareAsyncCommit(SPager *pPager, TXN *pTxn) { ...@@ -403,6 +406,11 @@ int tdbPagerPrepareAsyncCommit(SPager *pPager, TXN *pTxn) {
while ((pNode = tRBTreeIterNext(&iter)) != NULL) { while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
pPage = (SPage *)pNode; pPage = (SPage *)pNode;
if (pPage->isLocal) continue; if (pPage->isLocal) continue;
SPgno pgno = TDB_PAGE_PGNO(pPage);
if (pgno > maxPgno) {
maxPgno = pgno;
}
ret = tdbPagerPWritePageToDB(pPager, pPage); ret = tdbPagerPWritePageToDB(pPager, pPage);
if (ret < 0) { if (ret < 0) {
tdbError("failed to write page to db since %s", tstrerror(terrno)); tdbError("failed to write page to db since %s", tstrerror(terrno));
...@@ -411,7 +419,8 @@ int tdbPagerPrepareAsyncCommit(SPager *pPager, TXN *pTxn) { ...@@ -411,7 +419,8 @@ int tdbPagerPrepareAsyncCommit(SPager *pPager, TXN *pTxn) {
} }
tdbTrace("tdbttl commit:%p, %d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize); tdbTrace("tdbttl commit:%p, %d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize);
pPager->dbOrigSize = pPager->dbFileSize; pPager->dbOrigSize = maxPgno;
// pPager->dbOrigSize = pPager->dbFileSize;
// release the page // release the page
iter = tRBTreeIterCreate(&pPager->rbt, 1); iter = tRBTreeIterCreate(&pPager->rbt, 1);
...@@ -423,6 +432,8 @@ int tdbPagerPrepareAsyncCommit(SPager *pPager, TXN *pTxn) { ...@@ -423,6 +432,8 @@ int tdbPagerPrepareAsyncCommit(SPager *pPager, TXN *pTxn) {
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
tdbPCacheRelease(pPager->pCache, pPage, pTxn); tdbPCacheRelease(pPager->pCache, pPage, pTxn);
} }
pTxn->preped = 1;
/* /*
tdbTrace("reset dirty tree: %p", &pPager->rbt); tdbTrace("reset dirty tree: %p", &pPager->rbt);
tRBTreeCreate(&pPager->rbt, pageCmpFn); tRBTreeCreate(&pPager->rbt, pageCmpFn);
...@@ -444,15 +455,15 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) { ...@@ -444,15 +455,15 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
SPgno journalSize = 0; SPgno journalSize = 0;
int ret; int ret;
// 0, sync the journal file // sync the journal file
ret = tdbOsFSync(pPager->jfd); ret = tdbOsFSync(pTxn->jfd);
if (ret < 0) { if (ret < 0) {
tdbError("failed to fsync jfd due to %s. file:%s", strerror(errno), pPager->jFileName); tdbError("failed to fsync jfd: %s. jfile:%s, %" PRId64, strerror(errno), pPager->jFileName, pTxn->txnId);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
tdb_fd_t jfd = pPager->jfd; tdb_fd_t jfd = pTxn->jfd;
ret = tdbGetFileSize(jfd, pPager->pageSize, &journalSize); ret = tdbGetFileSize(jfd, pPager->pageSize, &journalSize);
if (ret < 0) { if (ret < 0) {
...@@ -516,7 +527,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) { ...@@ -516,7 +527,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
pPage->isDirty = 0; pPage->isDirty = 0;
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
hashset_remove(pPager->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage))); hashset_remove(pTxn->jPageSet, (void *)((long)TDB_PAGE_PGNO(pPage)));
tdbPCacheRelease(pPager->pCache, pPage, pTxn); tdbPCacheRelease(pPager->pCache, pPage, pTxn);
} }
...@@ -524,11 +535,24 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) { ...@@ -524,11 +535,24 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
tRBTreeCreate(&pPager->rbt, pageCmpFn); tRBTreeCreate(&pPager->rbt, pageCmpFn);
// 4, remove the journal file // 4, remove the journal file
tdbOsClose(pPager->jfd); if (tdbOsClose(pTxn->jfd) < 0) {
(void)tdbOsRemove(pPager->jFileName); tdbError("failed to close jfd: %s. file:%s, %" PRId64, strerror(errno), pPager->jFileName, pTxn->txnId);
hashset_destroy(pPager->jPageSet); terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
char jTxnFileName[TDB_FILENAME_LEN];
sprintf(jTxnFileName, "%s.%" PRId64, pPager->jFileName, pTxn->txnId);
if (tdbOsRemove(jTxnFileName) < 0 && errno != ENOENT) {
tdbError("failed to remove file due to %s. file:%s", strerror(errno), jTxnFileName);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
hashset_destroy(pTxn->jPageSet);
pPager->inTran = 0; // pPager->inTran = 0;
return 0; return 0;
} }
...@@ -751,17 +775,18 @@ static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage) { ...@@ -751,17 +775,18 @@ static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage) {
pgno = TDB_PAGE_PGNO(pPage); pgno = TDB_PAGE_PGNO(pPage);
ret = tdbOsWrite(pPager->jfd, &pgno, sizeof(pgno)); ret = tdbOsWrite(pPager->pActiveTxn->jfd, &pgno, sizeof(pgno));
if (ret < 0) { if (ret < 0) {
tdbError("failed to write pgno due to %s. file:%s, pgno:%u", strerror(errno), pPager->jFileName, pgno); tdbError("failed to write pgno due to %s. file:%s, pgno:%u, txnId:%" PRId64, strerror(errno), pPager->jFileName,
pgno, pPager->pActiveTxn->txnId);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
ret = tdbOsWrite(pPager->jfd, pPage->pData, pPage->pageSize); ret = tdbOsWrite(pPager->pActiveTxn->jfd, pPage->pData, pPage->pageSize);
if (ret < 0) { if (ret < 0) {
tdbError("failed to write page data due to %s. file:%s, pageSize:%ld", strerror(errno), pPager->jFileName, tdbError("failed to write page data due to %s. file:%s, pageSize:%d, txnId:%" PRId64, strerror(errno),
(long)pPage->pageSize); pPager->jFileName, pPage->pageSize, pPager->pActiveTxn->txnId);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
......
...@@ -385,24 +385,21 @@ struct STDB { ...@@ -385,24 +385,21 @@ struct STDB {
int64_t txnId; int64_t txnId;
}; };
typedef struct hashset_st *hashset_t;
struct SPager { struct SPager {
char *dbFileName; char *dbFileName;
char *jFileName; char *jFileName;
int pageSize; int pageSize;
uint8_t fid[TDB_FILE_ID_LEN]; uint8_t fid[TDB_FILE_ID_LEN];
tdb_fd_t fd; tdb_fd_t fd;
tdb_fd_t jfd;
SPCache *pCache; SPCache *pCache;
SPgno dbFileSize; SPgno dbFileSize;
SPgno dbOrigSize; SPgno dbOrigSize;
// SPage *pDirty; // SPage *pDirty;
hashset_t jPageSet; SRBTree rbt;
SRBTree rbt; // u8 inTran;
u8 inTran; TXN *pActiveTxn;
SPager *pNext; // used by TDB SPager *pNext; // used by TDB
SPager *pHashNext; // used by TDB SPager *pHashNext; // used by TDB
#ifdef USE_MAINDB #ifdef USE_MAINDB
TDB *pEnv; TDB *pEnv;
#endif #endif
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册