提交 c17f8244 编写于 作者: M Minglei Jin

enh(tdb): new interface of meta prep async commit

上级 b8dd3843
...@@ -102,6 +102,7 @@ int metaClose(SMeta* pMeta); ...@@ -102,6 +102,7 @@ int metaClose(SMeta* pMeta);
int metaBegin(SMeta* pMeta, int8_t fromSys); int metaBegin(SMeta* pMeta, int8_t fromSys);
int metaCommit(SMeta* pMeta); int metaCommit(SMeta* pMeta);
int metaFinishCommit(SMeta* pMeta); int metaFinishCommit(SMeta* pMeta);
int metaPrepareAsyncCommit(SMeta* pMeta);
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
......
...@@ -35,6 +35,7 @@ int metaBegin(SMeta *pMeta, int8_t fromSys) { ...@@ -35,6 +35,7 @@ int metaBegin(SMeta *pMeta, int8_t fromSys) {
// commit the meta txn // commit the meta txn
int metaCommit(SMeta *pMeta) { return tdbCommit(pMeta->pEnv, &pMeta->txn); } int metaCommit(SMeta *pMeta) { return tdbCommit(pMeta->pEnv, &pMeta->txn); }
int metaFinishCommit(SMeta *pMeta) { return tdbPostCommit(pMeta->pEnv, &pMeta->txn); } int metaFinishCommit(SMeta *pMeta) { return tdbPostCommit(pMeta->pEnv, &pMeta->txn); }
int metaPrepareAsyncCommit(SMeta *pMeta) { return tdbPrepareAsyncCommit(pMeta->pEnv, &pMeta->txn); }
// abort the meta txn // abort the meta txn
int metaAbort(SMeta *pMeta) { return tdbAbort(pMeta->pEnv, &pMeta->txn); } int metaAbort(SMeta *pMeta) { return tdbAbort(pMeta->pEnv, &pMeta->txn); }
...@@ -36,6 +36,7 @@ int32_t tdbClose(TDB *pDb); ...@@ -36,6 +36,7 @@ int32_t tdbClose(TDB *pDb);
int32_t tdbBegin(TDB *pDb, TXN *pTxn); int32_t tdbBegin(TDB *pDb, TXN *pTxn);
int32_t tdbCommit(TDB *pDb, TXN *pTxn); int32_t tdbCommit(TDB *pDb, TXN *pTxn);
int32_t tdbPostCommit(TDB *pDb, TXN *pTxn); int32_t tdbPostCommit(TDB *pDb, TXN *pTxn);
int32_t tdbPrepareAsyncCommit(TDB *pDb, TXN *pTxn);
int32_t tdbAbort(TDB *pDb, TXN *pTxn); int32_t tdbAbort(TDB *pDb, TXN *pTxn);
int32_t tdbAlter(TDB *pDb, int pages); int32_t tdbAlter(TDB *pDb, int pages);
......
...@@ -138,7 +138,24 @@ int32_t tdbPostCommit(TDB *pDb, TXN *pTxn) { ...@@ -138,7 +138,24 @@ int32_t tdbPostCommit(TDB *pDb, TXN *pTxn) {
for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) { for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) {
ret = tdbPagerPostCommit(pPager, pTxn); ret = tdbPagerPostCommit(pPager, pTxn);
if (ret < 0) { if (ret < 0) {
tdbError("failed to commit pager since %s. dbName:%s, txnId:%" PRId64, tstrerror(terrno), pDb->dbName, pTxn->txnId); tdbError("failed to commit pager since %s. dbName:%s, txnId:%" PRId64, tstrerror(terrno), pDb->dbName,
pTxn->txnId);
return -1;
}
}
return 0;
}
int32_t tdbPrepareAsyncCommit(TDB *pDb, TXN *pTxn) {
SPager *pPager;
int ret;
for (pPager = pDb->pgrList; pPager; pPager = pPager->pNext) {
ret = tdbPagerPrepareAsyncCommit(pPager, pTxn);
if (ret < 0) {
tdbError("failed to commit pager since %s. dbName:%s, txnId:%" PRId64, tstrerror(terrno), pDb->dbName,
pTxn->txnId);
return -1; return -1;
} }
} }
......
...@@ -287,6 +287,10 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { ...@@ -287,6 +287,10 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
return -1; return -1;
} }
return 0;
}
int tdbPagerPostCommit(SPager *pPager, TXN *pTxn) {
// remove the journal file // remove the journal file
if (tdbOsClose(pPager->jfd) < 0) { if (tdbOsClose(pPager->jfd) < 0) {
tdbError("failed to close jfd due to %s. file:%s", strerror(errno), pPager->jFileName); tdbError("failed to close jfd due to %s. file:%s", strerror(errno), pPager->jFileName);
...@@ -305,15 +309,54 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { ...@@ -305,15 +309,54 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) {
return 0; return 0;
} }
int tdbPagerPostCommit(SPager *pPager, TXN *pTxn) { int tdbPagerPrepareAsyncCommit(SPager *pPager, TXN *pTxn) {
if (tdbOsRemove(pPager->jFileName) < 0 && errno != ENOENT) { SPage *pPage;
tdbError("failed to remove file due to %s. file:%s", strerror(errno), pPager->jFileName); int ret;
// sync the journal file
ret = tdbOsFSync(pPager->jfd);
if (ret < 0) {
tdbError("failed to fsync jfd due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
} }
pPager->inTran = 0; // loop to write the dirty pages to file
SRBTreeIter iter = tRBTreeIterCreate(&pPager->rbt, 1);
SRBTreeNode *pNode = NULL;
while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
pPage = (SPage *)pNode;
if (pPage->isLocal) continue;
ret = tdbPagerWritePageToDB(pPager, pPage);
if (ret < 0) {
tdbError("failed to write page to db since %s", tstrerror(terrno));
return -1;
}
}
tdbTrace("tdbttl commit:%p, %d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize);
pPager->dbOrigSize = pPager->dbFileSize;
// release the page
iter = tRBTreeIterCreate(&pPager->rbt, 1);
while ((pNode = tRBTreeIterNext(&iter)) != NULL) {
pPage = (SPage *)pNode;
if (pPage->isLocal) continue;
pPage->isDirty = 0;
tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage);
tdbPCacheRelease(pPager->pCache, pPage, pTxn);
}
/*
tRBTreeCreate(&pPager->rbt, pageCmpFn);
// sync the db file
if (tdbOsFSync(pPager->fd) < 0) {
tdbError("failed to fsync fd due to %s. file:%s", strerror(errno), pPager->dbFileName);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
*/
return 0; return 0;
} }
......
...@@ -191,6 +191,7 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage); ...@@ -191,6 +191,7 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage);
int tdbPagerBegin(SPager *pPager, TXN *pTxn); int tdbPagerBegin(SPager *pPager, TXN *pTxn);
int tdbPagerCommit(SPager *pPager, TXN *pTxn); int tdbPagerCommit(SPager *pPager, TXN *pTxn);
int tdbPagerPostCommit(SPager *pPager, TXN *pTxn); int tdbPagerPostCommit(SPager *pPager, TXN *pTxn);
int tdbPagerPrepareAsyncCommit(SPager *pPager, TXN *pTxn);
int tdbPagerAbort(SPager *pPager, TXN *pTxn); int tdbPagerAbort(SPager *pPager, TXN *pTxn);
int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg, int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPage)(SPage *, void *, int), void *arg,
TXN *pTxn); TXN *pTxn);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册