提交 5f2b393c 编写于 作者: M Minglei Jin

meta/snapshot: use nil heap and abort tdb when rollback

上级 7b20c09f
...@@ -101,6 +101,10 @@ typedef struct SMCtbCursor SMCtbCursor; ...@@ -101,6 +101,10 @@ typedef struct SMCtbCursor SMCtbCursor;
typedef struct SMStbCursor SMStbCursor; typedef struct SMStbCursor SMStbCursor;
typedef struct STbUidStore STbUidStore; typedef struct STbUidStore STbUidStore;
#define META_BEGIN_HEAP_BUFFERPOOL 0
#define META_BEGIN_HEAP_OS 1
#define META_BEGIN_HEAP_NIL 2
int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback); int metaOpen(SVnode* pVnode, SMeta** ppMeta, int8_t rollback);
int metaClose(SMeta* pMeta); int metaClose(SMeta* pMeta);
int metaBegin(SMeta* pMeta, int8_t fromSys); int metaBegin(SMeta* pMeta, int8_t fromSys);
...@@ -108,6 +112,7 @@ TXN* metaGetTxn(SMeta* pMeta); ...@@ -108,6 +112,7 @@ TXN* metaGetTxn(SMeta* pMeta);
int metaCommit(SMeta* pMeta, TXN* txn); int metaCommit(SMeta* pMeta, TXN* txn);
int metaFinishCommit(SMeta* pMeta, TXN* txn); int metaFinishCommit(SMeta* pMeta, TXN* txn);
int metaPrepareAsyncCommit(SMeta* pMeta); int metaPrepareAsyncCommit(SMeta* pMeta);
int metaAbort(SMeta* pMeta);
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
......
...@@ -19,19 +19,20 @@ static FORCE_INLINE void *metaMalloc(void *pPool, size_t size) { return vnodeBuf ...@@ -19,19 +19,20 @@ static FORCE_INLINE void *metaMalloc(void *pPool, size_t size) { return vnodeBuf
static FORCE_INLINE void metaFree(void *pPool, void *p) { vnodeBufPoolFree((SVBufPool *)pPool, p); } static FORCE_INLINE void metaFree(void *pPool, void *p) { vnodeBufPoolFree((SVBufPool *)pPool, p); }
// begin a meta txn // begin a meta txn
int metaBegin(SMeta *pMeta, int8_t fromSys) { int metaBegin(SMeta *pMeta, int8_t heap) {
void *(*xMalloc)(void *, size_t); void *(*xMalloc)(void *, size_t) = NULL;
void (*xFree)(void *, void *); void (*xFree)(void *, void *) = NULL;
void *xArg = NULL; void *xArg = NULL;
if (fromSys) { if (heap == META_BEGIN_HEAP_OS) {
xMalloc = tdbDefaultMalloc; xMalloc = tdbDefaultMalloc;
xFree = tdbDefaultFree; xFree = tdbDefaultFree;
} else { } else if (heap == META_BEGIN_HEAP_BUFFERPOOL) {
xMalloc = metaMalloc; xMalloc = metaMalloc;
xFree = metaFree; xFree = metaFree;
xArg = pMeta->pVnode->inUse; xArg = pMeta->pVnode->inUse;
} }
if (tdbBegin(pMeta->pEnv, &pMeta->txn, xMalloc, xFree, xArg, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { if (tdbBegin(pMeta->pEnv, &pMeta->txn, xMalloc, xFree, xArg, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1; return -1;
} }
......
...@@ -145,7 +145,7 @@ int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWr ...@@ -145,7 +145,7 @@ int32_t metaSnapWriterOpen(SMeta* pMeta, int64_t sver, int64_t ever, SMetaSnapWr
pWriter->sver = sver; pWriter->sver = sver;
pWriter->ever = ever; pWriter->ever = ever;
metaBegin(pMeta, 1); metaBegin(pMeta, META_BEGIN_HEAP_NIL);
*ppWriter = pWriter; *ppWriter = pWriter;
return code; return code;
...@@ -161,7 +161,8 @@ int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) { ...@@ -161,7 +161,8 @@ int32_t metaSnapWriterClose(SMetaSnapWriter** ppWriter, int8_t rollback) {
SMetaSnapWriter* pWriter = *ppWriter; SMetaSnapWriter* pWriter = *ppWriter;
if (rollback) { if (rollback) {
ASSERT(0); code = metaAbort(pWriter->pMeta);
if (code) goto _err;
} else { } else {
code = metaCommit(pWriter->pMeta, pWriter->pMeta->txn); code = metaCommit(pWriter->pMeta, pWriter->pMeta->txn);
if (code) goto _err; if (code) goto _err;
......
...@@ -38,7 +38,7 @@ int vnodeBegin(SVnode *pVnode) { ...@@ -38,7 +38,7 @@ int vnodeBegin(SVnode *pVnode) {
pVnode->state.commitID++; pVnode->state.commitID++;
// begin meta // begin meta
if (metaBegin(pVnode->pMeta, 0) < 0) { if (metaBegin(pVnode->pMeta, META_BEGIN_HEAP_BUFFERPOOL) < 0) {
vError("vgId:%d, failed to begin meta since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to begin meta since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册