diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 507ce7ebaf23d4ea3e36350c119d440467d0d230..202aeaf69e037b073e301bc2971c68eb539099d5 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2296,6 +2296,11 @@ typedef struct { int32_t tSerializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq); int32_t tDeserializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq); +typedef struct { + int32_t vgId; + SEpSet epSet; +} SVgEpSet; + typedef struct { int8_t version; // for compatibility(default 0) int8_t intervalUnit; // MACRO: TIME_UNIT_XXX @@ -2305,6 +2310,7 @@ typedef struct { char indexName[TSDB_INDEX_NAME_LEN]; int32_t exprLen; int32_t tagsFilterLen; + int32_t numOfVgroups; int64_t indexUid; tb_uid_t tableUid; // super/child/common table uid int64_t interval; @@ -2312,6 +2318,7 @@ typedef struct { int64_t sliding; char* expr; // sma expression char* tagsFilter; + SVgEpSet vgEpSet[]; } STSma; // Time-range-wise SMA typedef STSma SVCreateTSmaReq; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 2cf188866bd0486fe1d44f72b9cce87245160c89..012bc40c8a36fedee1c4d3ffffa1a9e663563fdd 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -152,8 +152,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_SYSTABLE_RETRIEVE, "retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "grant", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "auth", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_CONFIRM_WRITE, "mnode-confirm-write", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply-msg", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply", NULL, NULL) TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 477eff00047f5c78b7f09514cd12c21dda35c67d..9c9f33ac961a61a732cf87d26d0a8bf6cbd3a8d3 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3661,6 +3661,7 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) { if (tEncodeCStr(pCoder, pSma->indexName) < 0) return -1; if (tEncodeI32(pCoder, pSma->exprLen) < 0) return -1; if (tEncodeI32(pCoder, pSma->tagsFilterLen) < 0) return -1; + if (tEncodeI32(pCoder, pSma->numOfVgroups) < 0) return -1; if (tEncodeI64(pCoder, pSma->indexUid) < 0) return -1; if (tEncodeI64(pCoder, pSma->tableUid) < 0) return -1; if (tEncodeI64(pCoder, pSma->interval) < 0) return -1; @@ -3672,7 +3673,17 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) { if (pSma->tagsFilterLen > 0) { if (tEncodeCStr(pCoder, pSma->tagsFilter) < 0) return -1; } - + for (int32_t v = 0; v < pSma->numOfVgroups; ++v) { + if (tEncodeI32(pCoder, pSma->vgEpSet[v].vgId) < 0) return -1; + if (tEncodeI8(pCoder, pSma->vgEpSet[v].epSet.inUse) < 0) return -1; + int8_t numOfEps = pSma->vgEpSet[v].epSet.numOfEps; + if (tEncodeI8(pCoder, numOfEps) < 0) return -1; + for (int32_t n = 0; n < numOfEps; ++n) { + const SEp *pEp = &pSma->vgEpSet[v].epSet.eps[n]; + if (tEncodeCStr(pCoder, pEp->fqdn) < 0) return -1; + if (tEncodeU16(pCoder, pEp->port) < 0) return -1; + } + } return 0; } @@ -3685,6 +3696,7 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) { if (tDecodeCStrTo(pCoder, pSma->indexName) < 0) return -1; if (tDecodeI32(pCoder, &pSma->exprLen) < 0) return -1; if (tDecodeI32(pCoder, &pSma->tagsFilterLen) < 0) return -1; + if (tDecodeI32(pCoder, &pSma->numOfVgroups) < 0) return -1; if (tDecodeI64(pCoder, &pSma->indexUid) < 0) return -1; if (tDecodeI64(pCoder, &pSma->tableUid) < 0) return -1; if (tDecodeI64(pCoder, &pSma->interval) < 0) return -1; @@ -3700,6 +3712,17 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) { } else { pSma->tagsFilter = NULL; } + for (int32_t v = 0; v < pSma->numOfVgroups; ++v) { + if (tDecodeI32(pCoder, &pSma->vgEpSet[v].vgId) < 0) return -1; + if (tDecodeI8(pCoder, &pSma->vgEpSet[v].epSet.inUse) < 0) return -1; + if (tDecodeI8(pCoder, &pSma->vgEpSet[v].epSet.numOfEps) < 0) return -1; + int8_t numOfEps = pSma->vgEpSet[v].epSet.numOfEps; + for (int32_t n = 0; n < numOfEps; ++n) { + SEp *pEp = &pSma->vgEpSet[v].epSet.eps[n]; + if (tDecodeCStrTo(pCoder, pEp->fqdn) < 0) return -1; + if (tDecodeU16(pCoder, &pEp->port) < 0) return -1; + } + } return 0; } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 4a79513736fddf733feee97106cb6e9e2c048954..78ab8509640d9d24304d8c55df9408756bf28f69 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -197,7 +197,6 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIRM_WRITE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 6d1f3710830563e24fe124a3a95582b316ef4e00..736fa3b36108e2e840b1f24a25c302262f83e6ca 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -29,12 +29,18 @@ typedef enum { TRANS_STOP_FUNC_MQ_REB = 4, } ETrnFunc; +typedef enum { + TRANS_ACTION_NULL = 0, + TRANS_ACTION_MSG = 1, + TRANS_ACTION_RAW = 2, +} ETrnAct; + typedef struct { int32_t id; int32_t errCode; int32_t acceptableCode; int8_t stage; - int8_t actionType; // 0-msg, 1-raw + ETrnAct actionType; int8_t rawWritten; int8_t msgSent; int8_t msgReceived; @@ -57,6 +63,7 @@ void mndTransDrop(STrans *pTrans); int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw); +int32_t mndTransAppendNullLog(STrans *pTrans); int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); @@ -65,7 +72,7 @@ void mndTransSetDbName(STrans *pTrans, const char *dbname); void mndTransSetSerial(STrans *pTrans); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); -void mndTransProcessRsp(SRpcMsg *pRsp); +int32_t mndTransProcessRsp(SRpcMsg *pRsp); void mndTransPullup(SMnode *pMnode); int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans); diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 013a991e98d1b57c88d0cbf262877afa05d587f6..1cc832b3af9d6f09b0c99f3157cfabc111ef71cb 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -34,9 +34,6 @@ static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNe static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq); static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq); static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq); -static int32_t mndProcessCreateMnodeRsp(SRpcMsg *pRsp); -static int32_t mndProcessAlterMnodeRsp(SRpcMsg *pRsp); -static int32_t mndProcessDropMnodeRsp(SRpcMsg *pRsp); static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter); @@ -53,11 +50,11 @@ int32_t mndInitMnode(SMnode *pMnode) { }; mndSetMsgHandle(pMnode, TDMT_MND_CREATE_MNODE, mndProcessCreateMnodeReq); + mndSetMsgHandle(pMnode, TDMT_DND_CREATE_MNODE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE, mndProcessAlterMnodeReq); + mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq); - mndSetMsgHandle(pMnode, TDMT_DND_CREATE_MNODE_RSP, mndProcessCreateMnodeRsp); - mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE_RSP, mndProcessAlterMnodeRsp); - mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndProcessDropMnodeRsp); + mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndTransProcessRsp); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndRetrieveMnodes); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndCancelGetNextMnode); @@ -367,7 +364,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER; - + if (mndTransAppendNullLog(pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; @@ -549,6 +546,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) { if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto _OVER; if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto _OVER; if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto _OVER; + if (mndTransAppendNullLog(pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; @@ -616,21 +614,6 @@ _OVER: return code; } -static int32_t mndProcessCreateMnodeRsp(SRpcMsg *pRsp) { - mndTransProcessRsp(pRsp); - return 0; -} - -static int32_t mndProcessAlterMnodeRsp(SRpcMsg *pRsp) { - mndTransProcessRsp(pRsp); - return 0; -} - -static int32_t mndProcessDropMnodeRsp(SRpcMsg *pRsp) { - mndTransProcessRsp(pRsp); - return 0; -} - static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index bbee59090d1600693478d38fec9ff47082bcc032..cc9cc6edc284438023b2410e1f5a9d1bc716727b 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -88,12 +88,14 @@ static int32_t mndTransGetActionsSize(SArray *pArray) { for (int32_t i = 0; i < actionNum; ++i) { STransAction *pAction = taosArrayGet(pArray, i); - if (pAction->actionType) { + if (pAction->actionType == TRANS_ACTION_RAW) { rawDataLen += (sdbGetRawTotalSize(pAction->pRaw) + sizeof(int32_t)); - } else { + } else if (pAction->actionType == TRANS_ACTION_MSG) { rawDataLen += (sizeof(STransAction) + pAction->contLen); + } else { + // empty } - rawDataLen += sizeof(pAction->actionType); + rawDataLen += sizeof(int8_t); } return rawDataLen; @@ -137,18 +139,20 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER) SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER) - if (pAction->actionType) { + if (pAction->actionType == TRANS_ACTION_RAW) { int32_t len = sdbGetRawTotalSize(pAction->pRaw); SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER) SDB_SET_INT32(pRaw, dataPos, len, _OVER) SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER) - } else { + } else if (pAction->actionType == TRANS_ACTION_MSG) { SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER) SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER) SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) + } else { + // nothing } } @@ -159,18 +163,20 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER) SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER) - if (pAction->actionType) { + if (pAction->actionType == TRANS_ACTION_RAW) { int32_t len = sdbGetRawTotalSize(pAction->pRaw); SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER) SDB_SET_INT32(pRaw, dataPos, len, _OVER) SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER) - } else { + } else if (pAction->actionType == TRANS_ACTION_MSG) { SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER) SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER) SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) + } else { + // nothing } } @@ -181,18 +187,20 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER) SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER) - if (pAction->actionType) { + if (pAction->actionType == TRANS_ACTION_RAW) { int32_t len = sdbGetRawTotalSize(pAction->pRaw); SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER) SDB_SET_INT32(pRaw, dataPos, len, _OVER) SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER) - } else { + } else if (pAction->actionType == TRANS_ACTION_MSG) { SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER) SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER) SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) + } else { + // nothing } } @@ -252,6 +260,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { int16_t policy = 0; int16_t conflict = 0; int16_t exec = 0; + int8_t actionType = 0; SDB_GET_INT16(pRaw, dataPos, &stage, _OVER) SDB_GET_INT16(pRaw, dataPos, &policy, _OVER) SDB_GET_INT16(pRaw, dataPos, &conflict, _OVER) @@ -279,9 +288,10 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER) SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER) SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) - SDB_GET_INT8(pRaw, dataPos, &action.actionType, _OVER) + SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER) + action.actionType = actionType; SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER) - if (action.actionType) { + if (action.actionType == TRANS_ACTION_RAW) { SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER) SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) action.pRaw = taosMemoryMalloc(dataLen); @@ -290,7 +300,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER); if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER; action.pRaw = NULL; - } else { + } else if (action.actionType == TRANS_ACTION_MSG) { SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER) @@ -301,6 +311,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER); if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER; action.pCont = NULL; + } else { + if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER; } } @@ -308,9 +320,10 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER) SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER) SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) - SDB_GET_INT8(pRaw, dataPos, &action.actionType, _OVER) + SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER) + action.actionType = actionType; SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER) - if (action.actionType) { + if (action.actionType == TRANS_ACTION_RAW) { SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER) SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) action.pRaw = taosMemoryMalloc(dataLen); @@ -319,7 +332,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER); if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER; action.pRaw = NULL; - } else { + } else if (action.actionType == TRANS_ACTION_MSG) { SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER) @@ -330,6 +343,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER); if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER; action.pCont = NULL; + } else { + if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER; } } @@ -337,7 +352,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER) SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER) SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) - SDB_GET_INT8(pRaw, dataPos, &action.actionType, _OVER) + SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER) + action.actionType = actionType; SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER) if (action.actionType) { SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER) @@ -348,7 +364,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER); if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER; action.pRaw = NULL; - } else { + } else if (action.actionType == TRANS_ACTION_MSG) { SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER) @@ -359,6 +375,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER); if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER; action.pCont = NULL; + } else { + if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER; } } @@ -552,10 +570,12 @@ static void mndTransDropActions(SArray *pArray) { int32_t size = taosArrayGetSize(pArray); for (int32_t i = 0; i < size; ++i) { STransAction *pAction = taosArrayGet(pArray, i); - if (pAction->actionType) { + if (pAction->actionType == TRANS_ACTION_RAW) { taosMemoryFreeClear(pAction->pRaw); - } else { + } else if (pAction->actionType == TRANS_ACTION_MSG) { taosMemoryFreeClear(pAction->pCont); + } else { + // nothing } } @@ -583,27 +603,34 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) { } int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { - STransAction action = {.stage = TRN_STAGE_REDO_ACTION, .actionType = true, .pRaw = pRaw}; + STransAction action = {.stage = TRN_STAGE_REDO_ACTION, .actionType = TRANS_ACTION_RAW, .pRaw = pRaw}; + return mndTransAppendAction(pTrans->redoActions, &action); +} + +int32_t mndTransAppendNullLog(STrans *pTrans) { + STransAction action = {.stage = TRN_STAGE_REDO_ACTION, .actionType = TRANS_ACTION_NULL}; return mndTransAppendAction(pTrans->redoActions, &action); } int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) { - STransAction action = {.stage = TRN_STAGE_UNDO_ACTION, .actionType = true, .pRaw = pRaw}; + STransAction action = {.stage = TRN_STAGE_UNDO_ACTION, .actionType = TRANS_ACTION_RAW, .pRaw = pRaw}; return mndTransAppendAction(pTrans->undoActions, &action); } int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { - STransAction action = {.stage = TRN_STAGE_COMMIT_ACTION, .actionType = true, .pRaw = pRaw}; + STransAction action = {.stage = TRN_STAGE_COMMIT_ACTION, .actionType = TRANS_ACTION_RAW, .pRaw = pRaw}; return mndTransAppendAction(pTrans->commitActions, &action); } int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) { pAction->stage = TRN_STAGE_REDO_ACTION; + pAction->actionType = TRANS_ACTION_MSG; return mndTransAppendAction(pTrans->redoActions, pAction); } int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) { pAction->stage = TRN_STAGE_UNDO_ACTION; + pAction->actionType = TRANS_ACTION_MSG; return mndTransAppendAction(pTrans->undoActions, pAction); } @@ -782,7 +809,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { } } -void mndTransProcessRsp(SRpcMsg *pRsp) { +int32_t mndTransProcessRsp(SRpcMsg *pRsp) { SMnode *pMnode = pRsp->info.node; int64_t signature = (int64_t)(pRsp->info.ahandle); int32_t transId = (int32_t)(signature >> 32); @@ -827,6 +854,7 @@ void mndTransProcessRsp(SRpcMsg *pRsp) { _OVER: mndReleaseTrans(pMnode, pTrans); + return 0; } static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { @@ -899,10 +927,15 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio } static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { - if (pAction->actionType) { + if (pAction->actionType == TRANS_ACTION_RAW) { return mndTransWriteSingleLog(pMnode, pTrans, pAction); - } else { + } else if (pAction->actionType == TRANS_ACTION_MSG) { return mndTransSendSingleMsg(pMnode, pTrans, pAction); + } else { + pAction->rawWritten = 0; + pAction->errCode = 0; + mDebug("trans:%d, %s:%d null action executed", pTrans->id, mndTransStr(pAction->stage), pAction->id); + return 0; } }