提交 873ad883 编写于 作者: B Benguang Zhao

enh: add prepareActions in STrans

上级 f8841046
......@@ -157,6 +157,7 @@ typedef struct {
void* rpcRsp;
int32_t rpcRspLen;
int32_t redoActionPos;
SArray* prepareActions;
SArray* redoActions;
SArray* undoActions;
SArray* commitActions;
......
......@@ -70,6 +70,7 @@ int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendNullLog(STrans *pTrans);
int32_t mndTransAppendPrepareAction(STrans *pTrans, STransAction *pRaw);
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
......@@ -86,7 +87,8 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
int32_t mndTransProcessRsp(SRpcMsg *pRsp);
void mndTransPullup(SMnode *pMnode);
int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans);
void mndTransExecute(SMnode *pMnode, STrans *pTrans, bool isLeader);
void mndTransExecute(SMnode *pMnode, STrans *pTrans);
void mndTransRefresh(SMnode *pMnode, STrans *pTrans);
int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, const char *dbname);
#ifdef __cplusplus
......
......@@ -116,7 +116,7 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta
if (pTrans != NULL) {
mInfo("trans:%d, execute in mnode which not leader or sync timeout, createTime:%" PRId64 " saved trans:%d",
transId, pTrans->createdTime, pMgmt->transId);
mndTransExecute(pMnode, pTrans, false);
mndTransRefresh(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans);
} else {
mError("trans:%d, not found while execute in mnode since %s", transId, terrstr());
......
......@@ -23,15 +23,16 @@
#include "mndSync.h"
#include "mndUser.h"
#define TRANS_VER_NUMBER 1
#define TRANS_VER1_NUMBER 1
#define TRANS_VER2_NUMBER 2
#define TRANS_ARRAY_SIZE 8
#define TRANS_RESERVE_SIZE 48
static SSdbRaw *mndTransActionEncode(STrans *pTrans);
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw);
static SSdbRaw *mndTransEncode(STrans *pTrans);
static SSdbRow *mndTransDecode(SSdbRaw *pRaw);
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOld);
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc);
static int32_t mndTransDelete(SSdb *pSdb, STrans *pTrans, bool callFunc);
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw);
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction);
......@@ -67,11 +68,11 @@ int32_t mndInitTrans(SMnode *pMnode) {
SSdbTable table = {
.sdbType = SDB_TRANS,
.keyType = SDB_KEY_INT32,
.encodeFp = (SdbEncodeFp)mndTransActionEncode,
.decodeFp = (SdbDecodeFp)mndTransActionDecode,
.encodeFp = (SdbEncodeFp)mndTransEncode,
.decodeFp = (SdbDecodeFp)mndTransDecode,
.insertFp = (SdbInsertFp)mndTransActionInsert,
.updateFp = (SdbUpdateFp)mndTransActionUpdate,
.deleteFp = (SdbDeleteFp)mndTransActionDelete,
.deleteFp = (SdbDeleteFp)mndTransDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_TRANS_TIMER, mndProcessTransTimer);
......@@ -103,15 +104,55 @@ static int32_t mndTransGetActionsSize(SArray *pArray) {
return rawDataLen;
}
static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
static int32_t mndTransEncodeAction(SSdbRaw *pRaw, int32_t *offset, SArray *pActions, int32_t actionsNum) {
int32_t dataPos = *offset;
int8_t unused = 0;
int32_t ret = -1;
for (int32_t i = 0; i < actionsNum; ++i) {
STransAction *pAction = taosArrayGet(pActions, i);
SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->retryCode, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->reserved, _OVER)
if (pAction->actionType == TRANS_ACTION_RAW) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->rawWritten*/, _OVER)
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
} else if (pAction->actionType == TRANS_ACTION_MSG) {
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgSent*/, _OVER)
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgReceived*/, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
} else {
// nothing
}
}
ret = 0;
_OVER:
*offset = dataPos;
return ret;
}
static SSdbRaw *mndTransEncode(STrans *pTrans) {
terrno = TSDB_CODE_INVALID_MSG;
int8_t sver = taosArrayGetSize(pTrans->prepareActions) ? TRANS_VER2_NUMBER : TRANS_VER1_NUMBER;
int32_t rawDataLen = sizeof(STrans) + TRANS_RESERVE_SIZE + pTrans->paramLen;
rawDataLen += mndTransGetActionsSize(pTrans->prepareActions);
rawDataLen += mndTransGetActionsSize(pTrans->redoActions);
rawDataLen += mndTransGetActionsSize(pTrans->undoActions);
rawDataLen += mndTransGetActionsSize(pTrans->commitActions);
SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, TRANS_VER_NUMBER, rawDataLen);
SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, sver, rawDataLen);
if (pRaw == NULL) {
mError("trans:%d, failed to alloc raw since %s", pTrans->id, terrstr());
return NULL;
......@@ -131,91 +172,22 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER)
int32_t prepareActionNum = taosArrayGetSize(pTrans->prepareActions);
int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions);
int32_t commitActionNum = taosArrayGetSize(pTrans->commitActions);
if (sver > TRANS_VER1_NUMBER) {
SDB_SET_INT32(pRaw, dataPos, prepareActionNum, _OVER)
}
SDB_SET_INT32(pRaw, dataPos, redoActionNum, _OVER)
SDB_SET_INT32(pRaw, dataPos, undoActionNum, _OVER)
SDB_SET_INT32(pRaw, dataPos, commitActionNum, _OVER)
int8_t unused = 0;
for (int32_t i = 0; i < redoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->retryCode, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->reserved, _OVER)
if (pAction->actionType == TRANS_ACTION_RAW) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->rawWritten*/, _OVER)
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
} else if (pAction->actionType == TRANS_ACTION_MSG) {
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgSent*/, _OVER)
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgReceived*/, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
} else {
// nothing
}
}
for (int32_t i = 0; i < undoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->retryCode, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->reserved, _OVER)
if (pAction->actionType == TRANS_ACTION_RAW) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->rawWritten*/, _OVER)
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
} else if (pAction->actionType == TRANS_ACTION_MSG) {
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgSent*/, _OVER)
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgReceived*/, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
} else {
// nothing
}
}
for (int32_t i = 0; i < commitActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->commitActions, i);
SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->retryCode, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->reserved, _OVER)
if (pAction->actionType == TRANS_ACTION_RAW) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->rawWritten*/, _OVER)
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
} else if (pAction->actionType == TRANS_ACTION_MSG) {
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgSent*/, _OVER)
SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgReceived*/, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
} else {
// nothing
}
}
if (mndTransEncodeAction(pRaw, &dataPos, pTrans->prepareActions, prepareActionNum) < 0) goto _OVER;
if (mndTransEncodeAction(pRaw, &dataPos, pTrans->redoActions, redoActionNum) < 0) goto _OVER;
if (mndTransEncodeAction(pRaw, &dataPos, pTrans->undoActions, undoActionNum) < 0) goto _OVER;
if (mndTransEncodeAction(pRaw, &dataPos, pTrans->commitActions, commitActionNum) < 0) goto _OVER;
SDB_SET_INT32(pRaw, dataPos, pTrans->startFunc, _OVER)
SDB_SET_INT32(pRaw, dataPos, pTrans->stopFunc, _OVER)
......@@ -242,14 +214,67 @@ _OVER:
return pRaw;
}
static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
static int32_t mndTransDecodeAction(SSdbRaw *pRaw, int32_t *offset, SArray *pActions, int32_t actionNum) {
STransAction action = {0};
int32_t dataPos = *offset;
int8_t unused = 0;
int8_t stage = 0;
int8_t actionType = 0;
int32_t dataLen = 0;
int32_t ret = -1;
for (int32_t i = 0; i < actionNum; ++i) {
memset(&action, 0, sizeof(action));
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.retryCode, _OVER)
SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER)
action.actionType = actionType;
SDB_GET_INT8(pRaw, dataPos, &stage, _OVER)
action.stage = stage;
SDB_GET_INT8(pRaw, dataPos, &action.reserved, _OVER)
if (action.actionType == TRANS_ACTION_RAW) {
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.rawWritten*/, _OVER)
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
action.pRaw = taosMemoryMalloc(dataLen);
if (action.pRaw == NULL) goto _OVER;
mTrace("raw:%p, is created", action.pRaw);
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
if (taosArrayPush(pActions, &action) == NULL) goto _OVER;
action.pRaw = NULL;
} else if (action.actionType == TRANS_ACTION_MSG) {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
tmsgUpdateDnodeEpSet(&action.epSet);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgSent*/, _OVER)
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgReceived*/, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER)
action.pCont = taosMemoryMalloc(action.contLen);
if (action.pCont == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
if (taosArrayPush(pActions, &action) == NULL) goto _OVER;
action.pCont = NULL;
} else {
if (taosArrayPush(pActions, &action) == NULL) goto _OVER;
}
}
ret = 0;
_OVER:
*offset = dataPos;
return ret;
}
static SSdbRow *mndTransDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_INVALID_MSG;
SSdbRow *pRow = NULL;
STrans *pTrans = NULL;
char *pData = NULL;
int32_t dataLen = 0;
int8_t sver = 0;
int32_t prepareActionNum = 0;
int32_t redoActionNum = 0;
int32_t undoActionNum = 0;
int32_t commitActionNum = 0;
......@@ -258,7 +283,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver != TRANS_VER_NUMBER) {
if (sver != TRANS_VER1_NUMBER && sver != TRANS_VER2_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto _OVER;
}
......@@ -294,127 +319,28 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER)
if (sver > TRANS_VER1_NUMBER) {
SDB_GET_INT32(pRaw, dataPos, &prepareActionNum, _OVER)
}
SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER)
SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER)
SDB_GET_INT32(pRaw, dataPos, &commitActionNum, _OVER)
pTrans->prepareActions = taosArrayInit(prepareActionNum, sizeof(STransAction));
pTrans->redoActions = taosArrayInit(redoActionNum, sizeof(STransAction));
pTrans->undoActions = taosArrayInit(undoActionNum, sizeof(STransAction));
pTrans->commitActions = taosArrayInit(commitActionNum, sizeof(STransAction));
if (pTrans->prepareActions == NULL) goto _OVER;
if (pTrans->redoActions == NULL) goto _OVER;
if (pTrans->undoActions == NULL) goto _OVER;
if (pTrans->commitActions == NULL) goto _OVER;
int8_t unused = 0;
for (int32_t i = 0; i < redoActionNum; ++i) {
memset(&action, 0, sizeof(action));
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.retryCode, _OVER)
SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER)
action.actionType = actionType;
SDB_GET_INT8(pRaw, dataPos, &stage, _OVER)
action.stage = stage;
SDB_GET_INT8(pRaw, dataPos, &action.reserved, _OVER)
if (action.actionType == TRANS_ACTION_RAW) {
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.rawWritten*/, _OVER)
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
action.pRaw = taosMemoryMalloc(dataLen);
if (action.pRaw == NULL) goto _OVER;
mTrace("raw:%p, is created", action.pRaw);
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
action.pRaw = NULL;
} else if (action.actionType == TRANS_ACTION_MSG) {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
tmsgUpdateDnodeEpSet(&action.epSet);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgSent*/, _OVER)
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgReceived*/, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER)
action.pCont = taosMemoryMalloc(action.contLen);
if (action.pCont == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
action.pCont = NULL;
} else {
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
}
}
for (int32_t i = 0; i < undoActionNum; ++i) {
memset(&action, 0, sizeof(action));
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.retryCode, _OVER)
SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER)
action.actionType = actionType;
SDB_GET_INT8(pRaw, dataPos, &stage, _OVER)
action.stage = stage;
SDB_GET_INT8(pRaw, dataPos, &action.reserved, _OVER)
if (action.actionType == TRANS_ACTION_RAW) {
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.rawWritten*/, _OVER)
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
action.pRaw = taosMemoryMalloc(dataLen);
if (action.pRaw == NULL) goto _OVER;
mTrace("raw:%p, is created", action.pRaw);
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER;
action.pRaw = NULL;
} else if (action.actionType == TRANS_ACTION_MSG) {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgSent*/, _OVER)
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgReceived*/, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER)
action.pCont = taosMemoryMalloc(action.contLen);
if (action.pCont == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER;
action.pCont = NULL;
} else {
if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER;
}
}
for (int32_t i = 0; i < commitActionNum; ++i) {
memset(&action, 0, sizeof(action));
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.retryCode, _OVER)
SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER)
action.actionType = actionType;
SDB_GET_INT8(pRaw, dataPos, &stage, _OVER)
action.stage = stage;
SDB_GET_INT8(pRaw, dataPos, &action.reserved, _OVER)
if (action.actionType) {
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.rawWritten*/, _OVER)
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
action.pRaw = taosMemoryMalloc(dataLen);
if (action.pRaw == NULL) goto _OVER;
mTrace("raw:%p, is created", action.pRaw);
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER;
action.pRaw = NULL;
} else if (action.actionType == TRANS_ACTION_MSG) {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgSent*/, _OVER)
SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgReceived*/, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER)
action.pCont = taosMemoryMalloc(action.contLen);
if (action.pCont == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER;
action.pCont = NULL;
} else {
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
}
}
if (mndTransDecodeAction(pRaw, &dataPos, pTrans->prepareActions, prepareActionNum) < 0) goto _OVER;
if (mndTransDecodeAction(pRaw, &dataPos, pTrans->redoActions, redoActionNum) < 0) goto _OVER;
if (mndTransDecodeAction(pRaw, &dataPos, pTrans->undoActions, undoActionNum) < 0) goto _OVER;
if (mndTransDecodeAction(pRaw, &dataPos, pTrans->commitActions, commitActionNum) < 0) goto _OVER;
SDB_GET_INT32(pRaw, dataPos, &pTrans->startFunc, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pTrans->stopFunc, _OVER)
......@@ -520,6 +446,10 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
}
static void mndTransDropData(STrans *pTrans) {
if (pTrans->prepareActions != NULL) {
mndTransDropActions(pTrans->prepareActions);
pTrans->prepareActions = NULL;
}
if (pTrans->redoActions != NULL) {
mndTransDropActions(pTrans->redoActions);
pTrans->redoActions = NULL;
......@@ -549,7 +479,7 @@ static void mndTransDropData(STrans *pTrans) {
(void)taosThreadMutexDestroy(&pTrans->mutex);
}
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) {
static int32_t mndTransDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) {
mInfo("trans:%d, perform delete action, row:%p stage:%s callfunc:%d, stopFunc:%d", pTrans->id, pTrans,
mndTransStr(pTrans->stage), callFunc, pTrans->stopFunc);
......@@ -590,6 +520,7 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
return -1;
}
mndTransUpdateActions(pOld->prepareActions, pNew->prepareActions);
mndTransUpdateActions(pOld->redoActions, pNew->redoActions);
mndTransUpdateActions(pOld->undoActions, pNew->undoActions);
mndTransUpdateActions(pOld->commitActions, pNew->commitActions);
......@@ -646,6 +577,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
pTrans->conflict = conflict;
pTrans->exec = TRN_EXEC_PARALLEL;
pTrans->createdTime = taosGetTimestampMs();
pTrans->prepareActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
......@@ -728,6 +660,13 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
return mndTransAppendAction(pTrans->commitActions, &action);
}
int32_t mndTransAppendPrepareAction(STrans *pTrans, STransAction *pAction) {
pAction->stage = TRN_STAGE_PREPARE;
pAction->actionType = TRANS_ACTION_RAW;
pAction->mTraceId = pTrans->mTraceId;
return mndTransAppendAction(pTrans->prepareActions, pAction);
}
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
pAction->stage = TRN_STAGE_REDO_ACTION;
pAction->actionType = TRANS_ACTION_MSG;
......@@ -800,7 +739,7 @@ void mndTransSetParallel(STrans *pTrans) { pTrans->exec = TRN_EXEC_PARALLEL; }
void mndTransSetOper(STrans *pTrans, EOperType oper) { pTrans->oper = oper; }
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
SSdbRaw *pRaw = mndTransActionEncode(pTrans);
SSdbRaw *pRaw = mndTransEncode(pTrans);
if (pRaw == NULL) {
mError("trans:%d, failed to encode while sync trans since %s", pTrans->id, terrstr());
return -1;
......@@ -922,7 +861,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
pTrans->rpcRsp = NULL;
pTrans->rpcRspLen = 0;
mndTransExecute(pMnode, pNew, true);
mndTransExecute(pMnode, pNew);
mndReleaseTrans(pMnode, pNew);
return 0;
}
......@@ -1104,7 +1043,7 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
mInfo("trans:%d, invalid action, index:%d, code:0x%x", transId, action, pRsp->code);
}
mndTransExecute(pMnode, pTrans, true);
mndTransExecute(pMnode, pTrans);
_OVER:
mndReleaseTrans(pMnode, pTrans);
......@@ -1550,7 +1489,7 @@ static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans) {
static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = false;
SSdbRaw *pRaw = mndTransActionEncode(pTrans);
SSdbRaw *pRaw = mndTransEncode(pTrans);
if (pRaw == NULL) {
mError("trans:%d, failed to encode while finish trans since %s", pTrans->id, terrstr());
return false;
......@@ -1567,12 +1506,12 @@ static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans) {
return continueExec;
}
void mndTransExecute(SMnode *pMnode, STrans *pTrans, bool isLeader) {
void mndTransExecuteImp(SMnode *pMnode, STrans *pTrans, bool topHalf) {
bool continueExec = true;
while (continueExec) {
mInfo("trans:%d, continue to execute, stage:%s createTime:%" PRId64 " leader:%d", pTrans->id,
mndTransStr(pTrans->stage), pTrans->createdTime, isLeader);
mInfo("trans:%d, continue to execute, stage:%s createTime:%" PRId64 " topHalf:%d", pTrans->id,
mndTransStr(pTrans->stage), pTrans->createdTime, topHalf);
pTrans->lastExecTime = taosGetTimestampMs();
switch (pTrans->stage) {
case TRN_STAGE_PREPARE:
......@@ -1582,7 +1521,7 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans, bool isLeader) {
continueExec = mndTransPerformRedoActionStage(pMnode, pTrans);
break;
case TRN_STAGE_COMMIT:
if (isLeader) {
if (topHalf) {
continueExec = mndTransPerformCommitStage(pMnode, pTrans);
} else {
mInfo("trans:%d, can not commit since not leader", pTrans->id);
......@@ -1593,7 +1532,7 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans, bool isLeader) {
continueExec = mndTransPerformCommitActionStage(pMnode, pTrans);
break;
case TRN_STAGE_ROLLBACK:
if (isLeader) {
if (topHalf) {
continueExec = mndTransPerformRollbackStage(pMnode, pTrans);
} else {
mInfo("trans:%d, can not rollback since not leader", pTrans->id);
......@@ -1604,7 +1543,7 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans, bool isLeader) {
continueExec = mndTransPerformUndoActionStage(pMnode, pTrans);
break;
case TRN_STAGE_PRE_FINISH:
if (isLeader) {
if (topHalf) {
continueExec = mndTransPerformPreFinishStage(pMnode, pTrans);
} else {
mInfo("trans:%d, can not pre-finish since not leader", pTrans->id);
......@@ -1623,6 +1562,16 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans, bool isLeader) {
mndTransSendRpcRsp(pMnode, pTrans);
}
void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
bool topHalf = true;
return mndTransExecuteImp(pMnode, pTrans, topHalf);
}
void mndTransRefresh(SMnode *pMnode, STrans *pTrans) {
bool topHalf = false;
return mndTransExecuteImp(pMnode, pTrans, topHalf);
}
static int32_t mndProcessTransTimer(SRpcMsg *pReq) {
mTrace("start to process trans timer");
mndTransPullup(pReq->info.node);
......@@ -1649,7 +1598,7 @@ int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) {
pAction->errCode = 0;
}
mndTransExecute(pMnode, pTrans, true);
mndTransExecute(pMnode, pTrans);
return 0;
}
......@@ -1707,7 +1656,7 @@ void mndTransPullup(SMnode *pMnode) {
int32_t *pTransId = taosArrayGet(pArray, i);
STrans *pTrans = mndAcquireTrans(pMnode, *pTransId);
if (pTrans != NULL) {
mndTransExecute(pMnode, pTrans, true);
mndTransExecute(pMnode, pTrans);
}
mndReleaseTrans(pMnode, pTrans);
}
......
......@@ -2255,11 +2255,13 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro
SDbObj dbObj = {0};
SArray *pArray = mndBuildDnodesArray(pMnode, 0);
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "split-vgroup");
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "split-vgroup");
if (pTrans == NULL) goto _OVER;
mndTransSetSerial(pTrans);
mInfo("trans:%d, used to split vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
mndTransSetDbName(pTrans, pDb->name, NULL);
SVgObj newVg1 = {0};
memcpy(&newVg1, pVgroup, sizeof(SVgObj));
mInfo("vgId:%d, vgroup info before split, replica:%d hashBegin:%u hashEnd:%u", newVg1.vgId, newVg1.replica,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册