From 873ad8839bed1e23dffbf375bd4fa9dfc4a7a43e Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Mon, 22 May 2023 10:37:55 +0800 Subject: [PATCH] enh: add prepareActions in STrans --- source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/inc/mndTrans.h | 4 +- source/dnode/mnode/impl/src/mndSync.c | 2 +- source/dnode/mnode/impl/src/mndTrans.c | 375 ++++++++++-------------- source/dnode/mnode/impl/src/mndVgroup.c | 4 +- 5 files changed, 170 insertions(+), 216 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 4ce7bb94eb..cd04b989ae 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -157,6 +157,7 @@ typedef struct { void* rpcRsp; int32_t rpcRspLen; int32_t redoActionPos; + SArray* prepareActions; SArray* redoActions; SArray* undoActions; SArray* commitActions; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 0d70434622..9fa59b4ac6 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -70,6 +70,7 @@ int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendNullLog(STrans *pTrans); +int32_t mndTransAppendPrepareAction(STrans *pTrans, STransAction *pRaw); int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); @@ -86,7 +87,8 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); int32_t mndTransProcessRsp(SRpcMsg *pRsp); void mndTransPullup(SMnode *pMnode); int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans); -void mndTransExecute(SMnode *pMnode, STrans *pTrans, bool isLeader); +void mndTransExecute(SMnode *pMnode, STrans *pTrans); +void mndTransRefresh(SMnode *pMnode, STrans *pTrans); int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, const char *dbname); #ifdef __cplusplus diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 0a6df02f5f..02925f9d8c 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -116,7 +116,7 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta if (pTrans != NULL) { mInfo("trans:%d, execute in mnode which not leader or sync timeout, createTime:%" PRId64 " saved trans:%d", transId, pTrans->createdTime, pMgmt->transId); - mndTransExecute(pMnode, pTrans, false); + mndTransRefresh(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans); } else { mError("trans:%d, not found while execute in mnode since %s", transId, terrstr()); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index d23662072d..69a6053ba6 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -23,15 +23,16 @@ #include "mndSync.h" #include "mndUser.h" -#define TRANS_VER_NUMBER 1 +#define TRANS_VER1_NUMBER 1 +#define TRANS_VER2_NUMBER 2 #define TRANS_ARRAY_SIZE 8 #define TRANS_RESERVE_SIZE 48 -static SSdbRaw *mndTransActionEncode(STrans *pTrans); -static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw); +static SSdbRaw *mndTransEncode(STrans *pTrans); +static SSdbRow *mndTransDecode(SSdbRaw *pRaw); static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans); static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOld); -static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc); +static int32_t mndTransDelete(SSdb *pSdb, STrans *pTrans, bool callFunc); static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw); static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction); @@ -67,11 +68,11 @@ int32_t mndInitTrans(SMnode *pMnode) { SSdbTable table = { .sdbType = SDB_TRANS, .keyType = SDB_KEY_INT32, - .encodeFp = (SdbEncodeFp)mndTransActionEncode, - .decodeFp = (SdbDecodeFp)mndTransActionDecode, + .encodeFp = (SdbEncodeFp)mndTransEncode, + .decodeFp = (SdbDecodeFp)mndTransDecode, .insertFp = (SdbInsertFp)mndTransActionInsert, .updateFp = (SdbUpdateFp)mndTransActionUpdate, - .deleteFp = (SdbDeleteFp)mndTransActionDelete, + .deleteFp = (SdbDeleteFp)mndTransDelete, }; mndSetMsgHandle(pMnode, TDMT_MND_TRANS_TIMER, mndProcessTransTimer); @@ -103,15 +104,55 @@ static int32_t mndTransGetActionsSize(SArray *pArray) { return rawDataLen; } -static SSdbRaw *mndTransActionEncode(STrans *pTrans) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + +static int32_t mndTransEncodeAction(SSdbRaw *pRaw, int32_t *offset, SArray *pActions, int32_t actionsNum) { + int32_t dataPos = *offset; + int8_t unused = 0; + int32_t ret = -1; + + for (int32_t i = 0; i < actionsNum; ++i) { + STransAction *pAction = taosArrayGet(pActions, i); + SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->retryCode, _OVER) + SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER) + SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER) + SDB_SET_INT8(pRaw, dataPos, pAction->reserved, _OVER) + if (pAction->actionType == TRANS_ACTION_RAW) { + int32_t len = sdbGetRawTotalSize(pAction->pRaw); + SDB_SET_INT8(pRaw, dataPos, unused /*pAction->rawWritten*/, _OVER) + SDB_SET_INT32(pRaw, dataPos, len, _OVER) + SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER) + } else if (pAction->actionType == TRANS_ACTION_MSG) { + SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) + SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) + SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgSent*/, _OVER) + SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgReceived*/, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) + SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) + } else { + // nothing + } + } + ret = 0; + +_OVER: + *offset = dataPos; + return ret; +} + +static SSdbRaw *mndTransEncode(STrans *pTrans) { + terrno = TSDB_CODE_INVALID_MSG; + int8_t sver = taosArrayGetSize(pTrans->prepareActions) ? TRANS_VER2_NUMBER : TRANS_VER1_NUMBER; int32_t rawDataLen = sizeof(STrans) + TRANS_RESERVE_SIZE + pTrans->paramLen; + rawDataLen += mndTransGetActionsSize(pTrans->prepareActions); rawDataLen += mndTransGetActionsSize(pTrans->redoActions); rawDataLen += mndTransGetActionsSize(pTrans->undoActions); rawDataLen += mndTransGetActionsSize(pTrans->commitActions); - SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, TRANS_VER_NUMBER, rawDataLen); + SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, sver, rawDataLen); if (pRaw == NULL) { mError("trans:%d, failed to alloc raw since %s", pTrans->id, terrstr()); return NULL; @@ -131,91 +172,22 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { SDB_SET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER) SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER) + int32_t prepareActionNum = taosArrayGetSize(pTrans->prepareActions); int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions); int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions); int32_t commitActionNum = taosArrayGetSize(pTrans->commitActions); + + if (sver > TRANS_VER1_NUMBER) { + SDB_SET_INT32(pRaw, dataPos, prepareActionNum, _OVER) + } SDB_SET_INT32(pRaw, dataPos, redoActionNum, _OVER) SDB_SET_INT32(pRaw, dataPos, undoActionNum, _OVER) SDB_SET_INT32(pRaw, dataPos, commitActionNum, _OVER) - int8_t unused = 0; - for (int32_t i = 0; i < redoActionNum; ++i) { - STransAction *pAction = taosArrayGet(pTrans->redoActions, i); - SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->retryCode, _OVER) - SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER) - SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER) - SDB_SET_INT8(pRaw, dataPos, pAction->reserved, _OVER) - if (pAction->actionType == TRANS_ACTION_RAW) { - int32_t len = sdbGetRawTotalSize(pAction->pRaw); - SDB_SET_INT8(pRaw, dataPos, unused /*pAction->rawWritten*/, _OVER) - SDB_SET_INT32(pRaw, dataPos, len, _OVER) - SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER) - } else if (pAction->actionType == TRANS_ACTION_MSG) { - SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) - SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) - SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgSent*/, _OVER) - SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgReceived*/, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) - SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) - } else { - // nothing - } - } - - for (int32_t i = 0; i < undoActionNum; ++i) { - STransAction *pAction = taosArrayGet(pTrans->undoActions, i); - SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->retryCode, _OVER) - SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER) - SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER) - SDB_SET_INT8(pRaw, dataPos, pAction->reserved, _OVER) - if (pAction->actionType == TRANS_ACTION_RAW) { - int32_t len = sdbGetRawTotalSize(pAction->pRaw); - SDB_SET_INT8(pRaw, dataPos, unused /*pAction->rawWritten*/, _OVER) - SDB_SET_INT32(pRaw, dataPos, len, _OVER) - SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER) - } else if (pAction->actionType == TRANS_ACTION_MSG) { - SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) - SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) - SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgSent*/, _OVER) - SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgReceived*/, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) - SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) - } else { - // nothing - } - } - - for (int32_t i = 0; i < commitActionNum; ++i) { - STransAction *pAction = taosArrayGet(pTrans->commitActions, i); - SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->retryCode, _OVER) - SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER) - SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER) - SDB_SET_INT8(pRaw, dataPos, pAction->reserved, _OVER) - if (pAction->actionType == TRANS_ACTION_RAW) { - int32_t len = sdbGetRawTotalSize(pAction->pRaw); - SDB_SET_INT8(pRaw, dataPos, unused /*pAction->rawWritten*/, _OVER) - SDB_SET_INT32(pRaw, dataPos, len, _OVER) - SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER) - } else if (pAction->actionType == TRANS_ACTION_MSG) { - SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) - SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) - SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgSent*/, _OVER) - SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgReceived*/, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) - SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) - } else { - // nothing - } - } + if (mndTransEncodeAction(pRaw, &dataPos, pTrans->prepareActions, prepareActionNum) < 0) goto _OVER; + if (mndTransEncodeAction(pRaw, &dataPos, pTrans->redoActions, redoActionNum) < 0) goto _OVER; + if (mndTransEncodeAction(pRaw, &dataPos, pTrans->undoActions, undoActionNum) < 0) goto _OVER; + if (mndTransEncodeAction(pRaw, &dataPos, pTrans->commitActions, commitActionNum) < 0) goto _OVER; SDB_SET_INT32(pRaw, dataPos, pTrans->startFunc, _OVER) SDB_SET_INT32(pRaw, dataPos, pTrans->stopFunc, _OVER) @@ -242,14 +214,67 @@ _OVER: return pRaw; } -static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { - terrno = TSDB_CODE_OUT_OF_MEMORY; +static int32_t mndTransDecodeAction(SSdbRaw *pRaw, int32_t *offset, SArray *pActions, int32_t actionNum) { + STransAction action = {0}; + int32_t dataPos = *offset; + int8_t unused = 0; + int8_t stage = 0; + int8_t actionType = 0; + int32_t dataLen = 0; + int32_t ret = -1; + + for (int32_t i = 0; i < actionNum; ++i) { + memset(&action, 0, sizeof(action)); + SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.retryCode, _OVER) + SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER) + action.actionType = actionType; + SDB_GET_INT8(pRaw, dataPos, &stage, _OVER) + action.stage = stage; + SDB_GET_INT8(pRaw, dataPos, &action.reserved, _OVER) + if (action.actionType == TRANS_ACTION_RAW) { + SDB_GET_INT8(pRaw, dataPos, &unused /*&action.rawWritten*/, _OVER) + SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) + action.pRaw = taosMemoryMalloc(dataLen); + if (action.pRaw == NULL) goto _OVER; + mTrace("raw:%p, is created", action.pRaw); + SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER); + if (taosArrayPush(pActions, &action) == NULL) goto _OVER; + action.pRaw = NULL; + } else if (action.actionType == TRANS_ACTION_MSG) { + SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); + tmsgUpdateDnodeEpSet(&action.epSet); + SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) + SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgSent*/, _OVER) + SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgReceived*/, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER) + action.pCont = taosMemoryMalloc(action.contLen); + if (action.pCont == NULL) goto _OVER; + SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER); + if (taosArrayPush(pActions, &action) == NULL) goto _OVER; + action.pCont = NULL; + } else { + if (taosArrayPush(pActions, &action) == NULL) goto _OVER; + } + } + ret = 0; + +_OVER: + *offset = dataPos; + return ret; +} + +static SSdbRow *mndTransDecode(SSdbRaw *pRaw) { + terrno = TSDB_CODE_INVALID_MSG; SSdbRow *pRow = NULL; STrans *pTrans = NULL; char *pData = NULL; int32_t dataLen = 0; int8_t sver = 0; + int32_t prepareActionNum = 0; int32_t redoActionNum = 0; int32_t undoActionNum = 0; int32_t commitActionNum = 0; @@ -258,7 +283,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; - if (sver != TRANS_VER_NUMBER) { + if (sver != TRANS_VER1_NUMBER && sver != TRANS_VER2_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; goto _OVER; } @@ -294,127 +319,28 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_TABLE_FNAME_LEN, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER) SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER) + + if (sver > TRANS_VER1_NUMBER) { + SDB_GET_INT32(pRaw, dataPos, &prepareActionNum, _OVER) + } SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER) SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER) SDB_GET_INT32(pRaw, dataPos, &commitActionNum, _OVER) + pTrans->prepareActions = taosArrayInit(prepareActionNum, sizeof(STransAction)); pTrans->redoActions = taosArrayInit(redoActionNum, sizeof(STransAction)); pTrans->undoActions = taosArrayInit(undoActionNum, sizeof(STransAction)); pTrans->commitActions = taosArrayInit(commitActionNum, sizeof(STransAction)); + if (pTrans->prepareActions == NULL) goto _OVER; if (pTrans->redoActions == NULL) goto _OVER; if (pTrans->undoActions == NULL) goto _OVER; if (pTrans->commitActions == NULL) goto _OVER; - int8_t unused = 0; - for (int32_t i = 0; i < redoActionNum; ++i) { - memset(&action, 0, sizeof(action)); - SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.retryCode, _OVER) - SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER) - action.actionType = actionType; - SDB_GET_INT8(pRaw, dataPos, &stage, _OVER) - action.stage = stage; - SDB_GET_INT8(pRaw, dataPos, &action.reserved, _OVER) - if (action.actionType == TRANS_ACTION_RAW) { - SDB_GET_INT8(pRaw, dataPos, &unused /*&action.rawWritten*/, _OVER) - SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) - action.pRaw = taosMemoryMalloc(dataLen); - if (action.pRaw == NULL) goto _OVER; - mTrace("raw:%p, is created", action.pRaw); - SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER); - if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER; - action.pRaw = NULL; - } else if (action.actionType == TRANS_ACTION_MSG) { - SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); - tmsgUpdateDnodeEpSet(&action.epSet); - SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) - SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgSent*/, _OVER) - SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgReceived*/, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER) - action.pCont = taosMemoryMalloc(action.contLen); - if (action.pCont == NULL) goto _OVER; - SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER); - if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER; - action.pCont = NULL; - } else { - if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER; - } - } - - for (int32_t i = 0; i < undoActionNum; ++i) { - memset(&action, 0, sizeof(action)); - SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.retryCode, _OVER) - SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER) - action.actionType = actionType; - SDB_GET_INT8(pRaw, dataPos, &stage, _OVER) - action.stage = stage; - SDB_GET_INT8(pRaw, dataPos, &action.reserved, _OVER) - if (action.actionType == TRANS_ACTION_RAW) { - SDB_GET_INT8(pRaw, dataPos, &unused /*&action.rawWritten*/, _OVER) - SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) - action.pRaw = taosMemoryMalloc(dataLen); - if (action.pRaw == NULL) goto _OVER; - mTrace("raw:%p, is created", action.pRaw); - SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER); - if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER; - action.pRaw = NULL; - } else if (action.actionType == TRANS_ACTION_MSG) { - SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); - SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) - SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgSent*/, _OVER) - SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgReceived*/, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER) - action.pCont = taosMemoryMalloc(action.contLen); - if (action.pCont == NULL) goto _OVER; - SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER); - if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER; - action.pCont = NULL; - } else { - if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER; - } - } - - for (int32_t i = 0; i < commitActionNum; ++i) { - memset(&action, 0, sizeof(action)); - SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.retryCode, _OVER) - SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER) - action.actionType = actionType; - SDB_GET_INT8(pRaw, dataPos, &stage, _OVER) - action.stage = stage; - SDB_GET_INT8(pRaw, dataPos, &action.reserved, _OVER) - if (action.actionType) { - SDB_GET_INT8(pRaw, dataPos, &unused /*&action.rawWritten*/, _OVER) - SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) - action.pRaw = taosMemoryMalloc(dataLen); - if (action.pRaw == NULL) goto _OVER; - mTrace("raw:%p, is created", action.pRaw); - SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER); - if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER; - action.pRaw = NULL; - } else if (action.actionType == TRANS_ACTION_MSG) { - SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); - SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) - SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgSent*/, _OVER) - SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgReceived*/, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER) - action.pCont = taosMemoryMalloc(action.contLen); - if (action.pCont == NULL) goto _OVER; - SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER); - if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER; - action.pCont = NULL; - } else { - if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER; - } - } + if (mndTransDecodeAction(pRaw, &dataPos, pTrans->prepareActions, prepareActionNum) < 0) goto _OVER; + if (mndTransDecodeAction(pRaw, &dataPos, pTrans->redoActions, redoActionNum) < 0) goto _OVER; + if (mndTransDecodeAction(pRaw, &dataPos, pTrans->undoActions, undoActionNum) < 0) goto _OVER; + if (mndTransDecodeAction(pRaw, &dataPos, pTrans->commitActions, commitActionNum) < 0) goto _OVER; SDB_GET_INT32(pRaw, dataPos, &pTrans->startFunc, _OVER) SDB_GET_INT32(pRaw, dataPos, &pTrans->stopFunc, _OVER) @@ -520,6 +446,10 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { } static void mndTransDropData(STrans *pTrans) { + if (pTrans->prepareActions != NULL) { + mndTransDropActions(pTrans->prepareActions); + pTrans->prepareActions = NULL; + } if (pTrans->redoActions != NULL) { mndTransDropActions(pTrans->redoActions); pTrans->redoActions = NULL; @@ -549,7 +479,7 @@ static void mndTransDropData(STrans *pTrans) { (void)taosThreadMutexDestroy(&pTrans->mutex); } -static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) { +static int32_t mndTransDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) { mInfo("trans:%d, perform delete action, row:%p stage:%s callfunc:%d, stopFunc:%d", pTrans->id, pTrans, mndTransStr(pTrans->stage), callFunc, pTrans->stopFunc); @@ -590,6 +520,7 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) { return -1; } + mndTransUpdateActions(pOld->prepareActions, pNew->prepareActions); mndTransUpdateActions(pOld->redoActions, pNew->redoActions); mndTransUpdateActions(pOld->undoActions, pNew->undoActions); mndTransUpdateActions(pOld->commitActions, pNew->commitActions); @@ -646,6 +577,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, pTrans->conflict = conflict; pTrans->exec = TRN_EXEC_PARALLEL; pTrans->createdTime = taosGetTimestampMs(); + pTrans->prepareActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); @@ -728,6 +660,13 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendAction(pTrans->commitActions, &action); } +int32_t mndTransAppendPrepareAction(STrans *pTrans, STransAction *pAction) { + pAction->stage = TRN_STAGE_PREPARE; + pAction->actionType = TRANS_ACTION_RAW; + pAction->mTraceId = pTrans->mTraceId; + return mndTransAppendAction(pTrans->prepareActions, pAction); +} + int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) { pAction->stage = TRN_STAGE_REDO_ACTION; pAction->actionType = TRANS_ACTION_MSG; @@ -800,7 +739,7 @@ void mndTransSetParallel(STrans *pTrans) { pTrans->exec = TRN_EXEC_PARALLEL; } void mndTransSetOper(STrans *pTrans, EOperType oper) { pTrans->oper = oper; } static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { - SSdbRaw *pRaw = mndTransActionEncode(pTrans); + SSdbRaw *pRaw = mndTransEncode(pTrans); if (pRaw == NULL) { mError("trans:%d, failed to encode while sync trans since %s", pTrans->id, terrstr()); return -1; @@ -922,7 +861,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { pTrans->rpcRsp = NULL; pTrans->rpcRspLen = 0; - mndTransExecute(pMnode, pNew, true); + mndTransExecute(pMnode, pNew); mndReleaseTrans(pMnode, pNew); return 0; } @@ -1104,7 +1043,7 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) { mInfo("trans:%d, invalid action, index:%d, code:0x%x", transId, action, pRsp->code); } - mndTransExecute(pMnode, pTrans, true); + mndTransExecute(pMnode, pTrans); _OVER: mndReleaseTrans(pMnode, pTrans); @@ -1550,7 +1489,7 @@ static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans) { static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans) { bool continueExec = false; - SSdbRaw *pRaw = mndTransActionEncode(pTrans); + SSdbRaw *pRaw = mndTransEncode(pTrans); if (pRaw == NULL) { mError("trans:%d, failed to encode while finish trans since %s", pTrans->id, terrstr()); return false; @@ -1567,12 +1506,12 @@ static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans) { return continueExec; } -void mndTransExecute(SMnode *pMnode, STrans *pTrans, bool isLeader) { +void mndTransExecuteImp(SMnode *pMnode, STrans *pTrans, bool topHalf) { bool continueExec = true; while (continueExec) { - mInfo("trans:%d, continue to execute, stage:%s createTime:%" PRId64 " leader:%d", pTrans->id, - mndTransStr(pTrans->stage), pTrans->createdTime, isLeader); + mInfo("trans:%d, continue to execute, stage:%s createTime:%" PRId64 " topHalf:%d", pTrans->id, + mndTransStr(pTrans->stage), pTrans->createdTime, topHalf); pTrans->lastExecTime = taosGetTimestampMs(); switch (pTrans->stage) { case TRN_STAGE_PREPARE: @@ -1582,7 +1521,7 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans, bool isLeader) { continueExec = mndTransPerformRedoActionStage(pMnode, pTrans); break; case TRN_STAGE_COMMIT: - if (isLeader) { + if (topHalf) { continueExec = mndTransPerformCommitStage(pMnode, pTrans); } else { mInfo("trans:%d, can not commit since not leader", pTrans->id); @@ -1593,7 +1532,7 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans, bool isLeader) { continueExec = mndTransPerformCommitActionStage(pMnode, pTrans); break; case TRN_STAGE_ROLLBACK: - if (isLeader) { + if (topHalf) { continueExec = mndTransPerformRollbackStage(pMnode, pTrans); } else { mInfo("trans:%d, can not rollback since not leader", pTrans->id); @@ -1604,7 +1543,7 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans, bool isLeader) { continueExec = mndTransPerformUndoActionStage(pMnode, pTrans); break; case TRN_STAGE_PRE_FINISH: - if (isLeader) { + if (topHalf) { continueExec = mndTransPerformPreFinishStage(pMnode, pTrans); } else { mInfo("trans:%d, can not pre-finish since not leader", pTrans->id); @@ -1623,6 +1562,16 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans, bool isLeader) { mndTransSendRpcRsp(pMnode, pTrans); } +void mndTransExecute(SMnode *pMnode, STrans *pTrans) { + bool topHalf = true; + return mndTransExecuteImp(pMnode, pTrans, topHalf); +} + +void mndTransRefresh(SMnode *pMnode, STrans *pTrans) { + bool topHalf = false; + return mndTransExecuteImp(pMnode, pTrans, topHalf); +} + static int32_t mndProcessTransTimer(SRpcMsg *pReq) { mTrace("start to process trans timer"); mndTransPullup(pReq->info.node); @@ -1649,7 +1598,7 @@ int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) { pAction->errCode = 0; } - mndTransExecute(pMnode, pTrans, true); + mndTransExecute(pMnode, pTrans); return 0; } @@ -1707,7 +1656,7 @@ void mndTransPullup(SMnode *pMnode) { int32_t *pTransId = taosArrayGet(pArray, i); STrans *pTrans = mndAcquireTrans(pMnode, *pTransId); if (pTrans != NULL) { - mndTransExecute(pMnode, pTrans, true); + mndTransExecute(pMnode, pTrans); } mndReleaseTrans(pMnode, pTrans); } diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 7691f0f8f4..a5a364023d 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -2255,11 +2255,13 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro SDbObj dbObj = {0}; SArray *pArray = mndBuildDnodesArray(pMnode, 0); - pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "split-vgroup"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "split-vgroup"); if (pTrans == NULL) goto _OVER; mndTransSetSerial(pTrans); mInfo("trans:%d, used to split vgroup, vgId:%d", pTrans->id, pVgroup->vgId); + mndTransSetDbName(pTrans, pDb->name, NULL); + SVgObj newVg1 = {0}; memcpy(&newVg1, pVgroup, sizeof(SVgObj)); mInfo("vgId:%d, vgroup info before split, replica:%d hashBegin:%u hashEnd:%u", newVg1.vgId, newVg1.replica, -- GitLab