提交 18ca6d93 编写于 作者: S Shengliang Guan

refactor: make trans support multi steps

上级 86f8bf6c
......@@ -129,7 +129,7 @@ typedef enum {
typedef enum {
TRN_EXEC_PARALLEL = 0,
TRN_EXEC_ONE_BY_ONE = 1,
TRN_EXEC_NO_PARALLEL = 1,
} ETrnExecType;
typedef enum {
......
......@@ -62,7 +62,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen);
void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb);
void mndTransSetExecOneByOne(STrans *pTrans);
void mndTransSetNoParallel(STrans *pTrans);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
void mndTransProcessRsp(SRpcMsg *pRsp);
......
......@@ -367,7 +367,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
mndTransSetExecOneByOne(pTrans);
mndTransSetNoParallel(pTrans);
if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER;
......@@ -539,7 +539,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
mndTransSetExecOneByOne(pTrans);
mndTransSetNoParallel(pTrans);
if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto _OVER;
......
......@@ -507,7 +507,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name);
mndTransSetDbInfo(pTrans, pDb);
mndTransSetExecOneByOne(pTrans);
mndTransSetNoParallel(pTrans);
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
......
......@@ -120,8 +120,8 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT16(pRaw, dataPos, pTrans->type, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->parallel, _OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->dbUid, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER)
SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER)
int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions);
......@@ -261,8 +261,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
pTrans->type = type;
pTrans->parallel = parallel;
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pTrans->dbUid, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER)
SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER)
SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER)
SDB_GET_INT32(pRaw, dataPos, &commitActionNum, _OVER)
......@@ -567,6 +567,7 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
mndTransUpdateActions(pOld->undoActions, pNew->undoActions);
mndTransUpdateActions(pOld->commitActions, pNew->commitActions);
pOld->stage = pNew->stage;
pOld->redoActionPos = pNew->redoActionPos;
if (pOld->stage == TRN_STAGE_COMMIT) {
pOld->stage = TRN_STAGE_COMMIT_ACTION;
......@@ -694,11 +695,10 @@ void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *
}
void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb) {
pTrans->dbUid = pDb->uid;
memcpy(pTrans->dbname, pDb->name, TSDB_DB_FNAME_LEN);
}
void mndTransSetExecOneByOne(STrans *pTrans) { pTrans->parallel = TRN_EXEC_ONE_BY_ONE; }
void mndTransSetNoParallel(STrans *pTrans) { pTrans->parallel = TRN_EXEC_NO_PARALLEL; }
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
SSdbRaw *pRaw = mndTransActionEncode(pTrans);
......@@ -708,7 +708,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
}
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("trans:%d, sync to other nodes", pTrans->id);
mDebug("trans:%d, sync to other mnodes", pTrans->id);
int32_t code = mndSyncPropose(pMnode, pRaw, pTrans->id);
if (code != 0) {
mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
......@@ -761,7 +761,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNewTrans) {
mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id);
conflict = true;
} else if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) {
if (pNewTrans->dbUid == pTrans->dbUid) {
if (strcmp(pNewTrans->dbname, pTrans->dbname) == 0) {
mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname);
conflict = true;
}
......@@ -774,7 +774,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNewTrans) {
mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id);
conflict = true;
} else if (mndIsDbTrans(pTrans)) {
if (pNewTrans->dbUid == pTrans->dbUid) {
if (strcmp(pNewTrans->dbname, pTrans->dbname) == 0) {
mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname);
conflict = true;
}
......@@ -856,7 +856,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
}
if (pTrans->policy == TRN_POLICY_ROLLBACK) {
if (pTrans->stage == pTrans->stage == TRN_STAGE_UNDO_ACTION || pTrans->stage == TRN_STAGE_ROLLBACK) {
if (pTrans->stage == TRN_STAGE_UNDO_ACTION || pTrans->stage == TRN_STAGE_ROLLBACK) {
if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR;
sendRsp = true;
}
......@@ -876,12 +876,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
mDebug("trans:%d, send rsp, code:0x%x stage:%s app:%p", pTrans->id, code, mndTransStr(pTrans->stage),
pTrans->rpcInfo.ahandle);
SRpcMsg rspMsg = {
.code = code,
.pCont = rpcCont,
.contLen = pTrans->rpcRspLen,
.info = pTrans->rpcInfo,
};
SRpcMsg rspMsg = {.code = code, .pCont = rpcCont, .contLen = pTrans->rpcRspLen, .info = pTrans->rpcInfo};
tmsgSendRsp(&rspMsg);
pTrans->rpcInfo.handle = NULL;
pTrans->rpcRsp = NULL;
......@@ -944,7 +939,6 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray)
for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pArray, action);
if (pAction == NULL) continue;
if (pAction->msgSent && pAction->msgReceived &&
(pAction->errCode == 0 || pAction->errCode == pAction->acceptableCode))
continue;
......@@ -1017,16 +1011,14 @@ static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAc
}
}
static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
static int32_t mndTransExecSingleActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
int32_t numOfActions = taosArrayGetSize(pArray);
int32_t code = 0;
for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pArray, action);
code = mndTransExecSingleAction(pMnode, pTrans, pAction);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
break;
}
if (code != 0) break;
}
return code;
......@@ -1036,7 +1028,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
int32_t numOfActions = taosArrayGetSize(pArray);
if (numOfActions == 0) return 0;
if (mndTransSendActionMsg(pMnode, pTrans, pArray) != 0) {
if (mndTransExecSingleActions(pMnode, pTrans, pArray) != 0) {
return -1;
}
......@@ -1044,8 +1036,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
int32_t errCode = 0;
for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pArray, action);
if (pAction == NULL) continue;
if ((pAction->msgSent && pAction->msgReceived) || pAction->rawWritten) {
if (pAction->msgReceived || pAction->rawWritten) {
numOfExecuted++;
if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
errCode = pAction->errCode;
......@@ -1087,40 +1078,65 @@ static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) {
static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans) {
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions);
if (code != 0) {
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("failed to execute commitActions since %s", terrstr());
}
return code;
}
static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
pTrans->stage = TRN_STAGE_REDO_ACTION;
mDebug("trans:%d, stage from prepare to redoAction", pTrans->id);
return continueExec;
}
static bool mndTransExecuteRedoActionsOneByOne(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
if (pTrans->redoActionPos >= taosArrayGetSize(pTrans->redoActions)) return continueExec;
static int32_t mndTransExecuteRedoActionsNoParallel(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
int32_t numOfActions = taosArrayGetSize(pTrans->redoActions);
if (numOfActions == 0) return code;
if (pTrans->redoActionPos >= numOfActions) return code;
for (int32_t action = pTrans->redoActionPos; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos);
int32_t code = mndTransExecSingleAction(pMnode, pTrans, pAction);
code = mndTransExecSingleAction(pMnode, pTrans, pAction);
if (code == 0) {
if (pAction->msgSent) {
if (pAction->msgReceived) {
if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
code = pAction->errCode;
}
} else {
code = TSDB_CODE_ACTION_IN_PROGRESS;
}
}
if (pAction->rawWritten) {
if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
code = pAction->errCode;
}
}
}
if (code == 0) {
pTrans->redoActionPos++;
mDebug("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage),
pAction->id);
// todo sync these infos
code = mndTransSync(pMnode, pTrans);
if (code != 0) {
mError("trans:%d, failed to sync redoActionPos since %s", pTrans->id, terrstr());
break;
}
} else if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
mDebug("trans:%d, %s:%d is in progress and wait it finish", pTrans->id, mndTransStr(pAction->stage), pAction->id);
continueExec = false;
break;
} else {
mError("trans:%d, %s:%d failed to execute since %s", pTrans->id, mndTransStr(pAction->stage), pAction->id,
terrstr());
continueExec = false;
break;
}
}
return code;
}
static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
pTrans->stage = TRN_STAGE_REDO_ACTION;
mDebug("trans:%d, stage from prepare to redoAction", pTrans->id);
return continueExec;
}
......@@ -1128,8 +1144,8 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
int32_t code = 0;
if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) {
code = mndTransExecuteRedoActionsOneByOne(pMnode, pTrans);
if (pTrans->parallel == TRN_EXEC_NO_PARALLEL) {
code = mndTransExecuteRedoActionsNoParallel(pMnode, pTrans);
} else {
code = mndTransExecuteRedoActions(pMnode, pTrans);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册