提交 e0e1fb31 编写于 作者: S Shengliang Guan

refactor: make trans can exec one by one

上级 84b3de32
...@@ -123,6 +123,11 @@ typedef enum { ...@@ -123,6 +123,11 @@ typedef enum {
TRN_POLICY_RETRY = 1, TRN_POLICY_RETRY = 1,
} ETrnPolicy; } ETrnPolicy;
typedef enum {
TRN_EXEC_PARALLEL = 0,
TRN_EXEC_ONE_BY_ONE = 1,
} ETrnExecType;
typedef enum { typedef enum {
DND_REASON_ONLINE = 0, DND_REASON_ONLINE = 0,
DND_REASON_STATUS_MSG_TIMEOUT, DND_REASON_STATUS_MSG_TIMEOUT,
...@@ -151,6 +156,7 @@ typedef struct { ...@@ -151,6 +156,7 @@ typedef struct {
ETrnStage stage; ETrnStage stage;
ETrnPolicy policy; ETrnPolicy policy;
ETrnType type; ETrnType type;
ETrnExecType parallel;
int32_t code; int32_t code;
int32_t failedTimes; int32_t failedTimes;
SRpcHandleInfo rpcInfo; SRpcHandleInfo rpcInfo;
......
...@@ -57,6 +57,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); ...@@ -57,6 +57,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
void mndTransSetCb(STrans *pTrans, ETrnFuncType startFunc, ETrnFuncType stopFunc, void *param, int32_t paramLen); void mndTransSetCb(STrans *pTrans, ETrnFuncType startFunc, ETrnFuncType stopFunc, void *param, int32_t paramLen);
void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb); void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb);
void mndTransSetExecOneByOne(STrans *pTrans);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
void mndTransProcessRsp(SRpcMsg *pRsp); void mndTransProcessRsp(SRpcMsg *pRsp);
......
...@@ -313,16 +313,16 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno ...@@ -313,16 +313,16 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN); memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
{ {
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq); int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &createReq);
void *pReq = taosMemoryMalloc(contLen); void *pReq = taosMemoryMalloc(contLen);
tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq); tSerializeSDCreateMnodeReq(pReq, contLen, &createReq);
STransAction action = { STransAction action = {
.epSet = alterEpset, .epSet = createEpset,
.pCont = pReq, .pCont = pReq,
.contLen = contLen, .contLen = contLen,
.msgType = TDMT_DND_ALTER_MNODE, .msgType = TDMT_DND_CREATE_MNODE,
.acceptableCode = 0, .acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED,
}; };
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
...@@ -332,16 +332,16 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno ...@@ -332,16 +332,16 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
} }
{ {
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &createReq); int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq);
void *pReq = taosMemoryMalloc(contLen); void *pReq = taosMemoryMalloc(contLen);
tSerializeSDCreateMnodeReq(pReq, contLen, &createReq); tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq);
STransAction action = { STransAction action = {
.epSet = createEpset, .epSet = alterEpset,
.pCont = pReq, .pCont = pReq,
.contLen = contLen, .contLen = contLen,
.msgType = TDMT_DND_CREATE_MNODE, .msgType = TDMT_DND_ALTER_MNODE,
.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED, .acceptableCode = 0,
}; };
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
...@@ -368,6 +368,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, ...@@ -368,6 +368,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER;
mndTransSetExecOneByOne(pTrans);
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
...@@ -541,6 +542,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) { ...@@ -541,6 +542,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto _OVER; if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto _OVER; if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
mndTransSetExecOneByOne(pTrans);
code = 0; code = 0;
......
...@@ -507,6 +507,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea ...@@ -507,6 +507,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name); mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name);
mndTransSetDbInfo(pTrans, pDb); mndTransSetDbInfo(pTrans, pDb);
mndTransSetExecOneByOne(pTrans);
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
......
...@@ -140,6 +140,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { ...@@ -140,6 +140,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT16(pRaw, dataPos, stage, _OVER) SDB_SET_INT16(pRaw, dataPos, stage, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->policy, _OVER) SDB_SET_INT16(pRaw, dataPos, pTrans->policy, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->type, _OVER) SDB_SET_INT16(pRaw, dataPos, pTrans->type, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->parallel, _OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER) SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->dbUid, _OVER) SDB_SET_INT64(pRaw, dataPos, pTrans->dbUid, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER)
...@@ -245,12 +246,15 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { ...@@ -245,12 +246,15 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
int16_t stage = 0; int16_t stage = 0;
int16_t policy = 0; int16_t policy = 0;
int16_t type = 0; int16_t type = 0;
int16_t parallel = 0;
SDB_GET_INT16(pRaw, dataPos, &stage, _OVER) SDB_GET_INT16(pRaw, dataPos, &stage, _OVER)
SDB_GET_INT16(pRaw, dataPos, &policy, _OVER) SDB_GET_INT16(pRaw, dataPos, &policy, _OVER)
SDB_GET_INT16(pRaw, dataPos, &type, _OVER) SDB_GET_INT16(pRaw, dataPos, &type, _OVER)
SDB_GET_INT16(pRaw, dataPos, &parallel, _OVER)
pTrans->stage = stage; pTrans->stage = stage;
pTrans->policy = policy; pTrans->policy = policy;
pTrans->type = type; pTrans->type = type;
pTrans->parallel = parallel;
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER) SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pTrans->dbUid, _OVER) SDB_GET_INT64(pRaw, dataPos, &pTrans->dbUid, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER)
...@@ -665,6 +669,8 @@ void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb) { ...@@ -665,6 +669,8 @@ void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb) {
memcpy(pTrans->dbname, pDb->name, TSDB_DB_FNAME_LEN); memcpy(pTrans->dbname, pDb->name, TSDB_DB_FNAME_LEN);
} }
void mndTransSetExecOneByOne(STrans *pTrans) { pTrans->parallel = TRN_EXEC_ONE_BY_ONE; }
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
SSdbRaw *pRaw = mndTransActionEncode(pTrans); SSdbRaw *pRaw = mndTransActionEncode(pTrans);
if (pRaw == NULL) { if (pRaw == NULL) {
...@@ -970,7 +976,18 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr ...@@ -970,7 +976,18 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr
for (int32_t action = 0; action < numOfActions; ++action) { for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pArray, action); STransAction *pAction = taosArrayGet(pArray, action);
if (pAction == NULL) continue; if (pAction == NULL) continue;
if (pAction->msgSent) continue;
if (pAction->msgSent) {
if (pAction->msgReceived) {
continue;
} else {
if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) {
break;
} else {
continue;
}
}
}
int64_t signature = pTrans->id; int64_t signature = pTrans->id;
signature = (signature << 32); signature = (signature << 32);
...@@ -990,6 +1007,9 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr ...@@ -990,6 +1007,9 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr
pAction->msgSent = 1; pAction->msgSent = 1;
pAction->msgReceived = 0; pAction->msgReceived = 0;
pAction->errCode = 0; pAction->errCode = 0;
if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) {
break;
}
} else { } else {
pAction->msgSent = 0; pAction->msgSent = 0;
pAction->msgReceived = 0; pAction->msgReceived = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册