提交 5b2637e5 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/row_refact

......@@ -2296,6 +2296,11 @@ typedef struct {
int32_t tSerializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq);
int32_t tDeserializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq);
typedef struct {
int32_t vgId;
SEpSet epSet;
} SVgEpSet;
typedef struct {
int8_t version; // for compatibility(default 0)
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
......@@ -2305,6 +2310,7 @@ typedef struct {
char indexName[TSDB_INDEX_NAME_LEN];
int32_t exprLen;
int32_t tagsFilterLen;
int32_t numOfVgroups;
int64_t indexUid;
tb_uid_t tableUid; // super/child/common table uid
int64_t interval;
......@@ -2312,6 +2318,7 @@ typedef struct {
int64_t sliding;
char* expr; // sma expression
char* tagsFilter;
SVgEpSet vgEpSet[];
} STSma; // Time-range-wise SMA
typedef STSma SVCreateTSmaReq;
......
......@@ -152,8 +152,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_SYSTABLE_RETRIEVE, "retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "grant", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "auth", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CONFIRM_WRITE, "mnode-confirm-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply-msg", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "mnode-apply", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)
......
......@@ -3661,6 +3661,7 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) {
if (tEncodeCStr(pCoder, pSma->indexName) < 0) return -1;
if (tEncodeI32(pCoder, pSma->exprLen) < 0) return -1;
if (tEncodeI32(pCoder, pSma->tagsFilterLen) < 0) return -1;
if (tEncodeI32(pCoder, pSma->numOfVgroups) < 0) return -1;
if (tEncodeI64(pCoder, pSma->indexUid) < 0) return -1;
if (tEncodeI64(pCoder, pSma->tableUid) < 0) return -1;
if (tEncodeI64(pCoder, pSma->interval) < 0) return -1;
......@@ -3672,7 +3673,17 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) {
if (pSma->tagsFilterLen > 0) {
if (tEncodeCStr(pCoder, pSma->tagsFilter) < 0) return -1;
}
for (int32_t v = 0; v < pSma->numOfVgroups; ++v) {
if (tEncodeI32(pCoder, pSma->vgEpSet[v].vgId) < 0) return -1;
if (tEncodeI8(pCoder, pSma->vgEpSet[v].epSet.inUse) < 0) return -1;
int8_t numOfEps = pSma->vgEpSet[v].epSet.numOfEps;
if (tEncodeI8(pCoder, numOfEps) < 0) return -1;
for (int32_t n = 0; n < numOfEps; ++n) {
const SEp *pEp = &pSma->vgEpSet[v].epSet.eps[n];
if (tEncodeCStr(pCoder, pEp->fqdn) < 0) return -1;
if (tEncodeU16(pCoder, pEp->port) < 0) return -1;
}
}
return 0;
}
......@@ -3685,6 +3696,7 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) {
if (tDecodeCStrTo(pCoder, pSma->indexName) < 0) return -1;
if (tDecodeI32(pCoder, &pSma->exprLen) < 0) return -1;
if (tDecodeI32(pCoder, &pSma->tagsFilterLen) < 0) return -1;
if (tDecodeI32(pCoder, &pSma->numOfVgroups) < 0) return -1;
if (tDecodeI64(pCoder, &pSma->indexUid) < 0) return -1;
if (tDecodeI64(pCoder, &pSma->tableUid) < 0) return -1;
if (tDecodeI64(pCoder, &pSma->interval) < 0) return -1;
......@@ -3700,6 +3712,17 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) {
} else {
pSma->tagsFilter = NULL;
}
for (int32_t v = 0; v < pSma->numOfVgroups; ++v) {
if (tDecodeI32(pCoder, &pSma->vgEpSet[v].vgId) < 0) return -1;
if (tDecodeI8(pCoder, &pSma->vgEpSet[v].epSet.inUse) < 0) return -1;
if (tDecodeI8(pCoder, &pSma->vgEpSet[v].epSet.numOfEps) < 0) return -1;
int8_t numOfEps = pSma->vgEpSet[v].epSet.numOfEps;
for (int32_t n = 0; n < numOfEps; ++n) {
SEp *pEp = &pSma->vgEpSet[v].epSet.eps[n];
if (tDecodeCStrTo(pCoder, pEp->fqdn) < 0) return -1;
if (tDecodeU16(pCoder, &pEp->port) < 0) return -1;
}
}
return 0;
}
......
......@@ -197,7 +197,6 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIRM_WRITE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
......
......@@ -29,12 +29,18 @@ typedef enum {
TRANS_STOP_FUNC_MQ_REB = 4,
} ETrnFunc;
typedef enum {
TRANS_ACTION_NULL = 0,
TRANS_ACTION_MSG = 1,
TRANS_ACTION_RAW = 2,
} ETrnAct;
typedef struct {
int32_t id;
int32_t errCode;
int32_t acceptableCode;
int8_t stage;
int8_t actionType; // 0-msg, 1-raw
ETrnAct actionType;
int8_t rawWritten;
int8_t msgSent;
int8_t msgReceived;
......@@ -57,6 +63,7 @@ void mndTransDrop(STrans *pTrans);
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendNullLog(STrans *pTrans);
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
......@@ -65,7 +72,7 @@ void mndTransSetDbName(STrans *pTrans, const char *dbname);
void mndTransSetSerial(STrans *pTrans);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
void mndTransProcessRsp(SRpcMsg *pRsp);
int32_t mndTransProcessRsp(SRpcMsg *pRsp);
void mndTransPullup(SMnode *pMnode);
int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans);
......
......@@ -34,9 +34,6 @@ static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOld, SMnodeObj *pNe
static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq);
static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq);
static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq);
static int32_t mndProcessCreateMnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessAlterMnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessDropMnodeRsp(SRpcMsg *pRsp);
static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter);
......@@ -53,11 +50,11 @@ int32_t mndInitMnode(SMnode *pMnode) {
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_MNODE, mndProcessCreateMnodeReq);
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_MNODE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE, mndProcessAlterMnodeReq);
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq);
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_MNODE_RSP, mndProcessCreateMnodeRsp);
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE_RSP, mndProcessAlterMnodeRsp);
mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndProcessDropMnodeRsp);
mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndTransProcessRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndRetrieveMnodes);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndCancelGetNextMnode);
......@@ -367,7 +364,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER;
if (mndTransAppendNullLog(pTrans) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
......@@ -549,6 +546,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto _OVER;
if (mndTransAppendNullLog(pTrans) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
......@@ -616,21 +614,6 @@ _OVER:
return code;
}
static int32_t mndProcessCreateMnodeRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndProcessAlterMnodeRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndProcessDropMnodeRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
......
......@@ -88,12 +88,14 @@ static int32_t mndTransGetActionsSize(SArray *pArray) {
for (int32_t i = 0; i < actionNum; ++i) {
STransAction *pAction = taosArrayGet(pArray, i);
if (pAction->actionType) {
if (pAction->actionType == TRANS_ACTION_RAW) {
rawDataLen += (sdbGetRawTotalSize(pAction->pRaw) + sizeof(int32_t));
} else {
} else if (pAction->actionType == TRANS_ACTION_MSG) {
rawDataLen += (sizeof(STransAction) + pAction->contLen);
} else {
// empty
}
rawDataLen += sizeof(pAction->actionType);
rawDataLen += sizeof(int8_t);
}
return rawDataLen;
......@@ -137,18 +139,20 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
if (pAction->actionType) {
if (pAction->actionType == TRANS_ACTION_RAW) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
} else {
} else if (pAction->actionType == TRANS_ACTION_MSG) {
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
} else {
// nothing
}
}
......@@ -159,18 +163,20 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
if (pAction->actionType) {
if (pAction->actionType == TRANS_ACTION_RAW) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
} else {
} else if (pAction->actionType == TRANS_ACTION_MSG) {
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
} else {
// nothing
}
}
......@@ -181,18 +187,20 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->actionType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
if (pAction->actionType) {
if (pAction->actionType == TRANS_ACTION_RAW) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
} else {
} else if (pAction->actionType == TRANS_ACTION_MSG) {
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
} else {
// nothing
}
}
......@@ -252,6 +260,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
int16_t policy = 0;
int16_t conflict = 0;
int16_t exec = 0;
int8_t actionType = 0;
SDB_GET_INT16(pRaw, dataPos, &stage, _OVER)
SDB_GET_INT16(pRaw, dataPos, &policy, _OVER)
SDB_GET_INT16(pRaw, dataPos, &conflict, _OVER)
......@@ -279,9 +288,10 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.actionType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER)
action.actionType = actionType;
SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER)
if (action.actionType) {
if (action.actionType == TRANS_ACTION_RAW) {
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
action.pRaw = taosMemoryMalloc(dataLen);
......@@ -290,7 +300,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
action.pRaw = NULL;
} else {
} else if (action.actionType == TRANS_ACTION_MSG) {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER)
......@@ -301,6 +311,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
action.pCont = NULL;
} else {
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
}
}
......@@ -308,9 +320,10 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.actionType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER)
action.actionType = actionType;
SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER)
if (action.actionType) {
if (action.actionType == TRANS_ACTION_RAW) {
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
action.pRaw = taosMemoryMalloc(dataLen);
......@@ -319,7 +332,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER;
action.pRaw = NULL;
} else {
} else if (action.actionType == TRANS_ACTION_MSG) {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER)
......@@ -330,6 +343,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER;
action.pCont = NULL;
} else {
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
}
}
......@@ -337,7 +352,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.actionType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &actionType, _OVER)
action.actionType = actionType;
SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER)
if (action.actionType) {
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
......@@ -348,7 +364,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER;
action.pRaw = NULL;
} else {
} else if (action.actionType == TRANS_ACTION_MSG) {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER)
......@@ -359,6 +375,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER;
action.pCont = NULL;
} else {
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
}
}
......@@ -552,10 +570,12 @@ static void mndTransDropActions(SArray *pArray) {
int32_t size = taosArrayGetSize(pArray);
for (int32_t i = 0; i < size; ++i) {
STransAction *pAction = taosArrayGet(pArray, i);
if (pAction->actionType) {
if (pAction->actionType == TRANS_ACTION_RAW) {
taosMemoryFreeClear(pAction->pRaw);
} else {
} else if (pAction->actionType == TRANS_ACTION_MSG) {
taosMemoryFreeClear(pAction->pCont);
} else {
// nothing
}
}
......@@ -583,27 +603,34 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
}
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) {
STransAction action = {.stage = TRN_STAGE_REDO_ACTION, .actionType = true, .pRaw = pRaw};
STransAction action = {.stage = TRN_STAGE_REDO_ACTION, .actionType = TRANS_ACTION_RAW, .pRaw = pRaw};
return mndTransAppendAction(pTrans->redoActions, &action);
}
int32_t mndTransAppendNullLog(STrans *pTrans) {
STransAction action = {.stage = TRN_STAGE_REDO_ACTION, .actionType = TRANS_ACTION_NULL};
return mndTransAppendAction(pTrans->redoActions, &action);
}
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) {
STransAction action = {.stage = TRN_STAGE_UNDO_ACTION, .actionType = true, .pRaw = pRaw};
STransAction action = {.stage = TRN_STAGE_UNDO_ACTION, .actionType = TRANS_ACTION_RAW, .pRaw = pRaw};
return mndTransAppendAction(pTrans->undoActions, &action);
}
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
STransAction action = {.stage = TRN_STAGE_COMMIT_ACTION, .actionType = true, .pRaw = pRaw};
STransAction action = {.stage = TRN_STAGE_COMMIT_ACTION, .actionType = TRANS_ACTION_RAW, .pRaw = pRaw};
return mndTransAppendAction(pTrans->commitActions, &action);
}
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
pAction->stage = TRN_STAGE_REDO_ACTION;
pAction->actionType = TRANS_ACTION_MSG;
return mndTransAppendAction(pTrans->redoActions, pAction);
}
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) {
pAction->stage = TRN_STAGE_UNDO_ACTION;
pAction->actionType = TRANS_ACTION_MSG;
return mndTransAppendAction(pTrans->undoActions, pAction);
}
......@@ -782,7 +809,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
}
}
void mndTransProcessRsp(SRpcMsg *pRsp) {
int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
SMnode *pMnode = pRsp->info.node;
int64_t signature = (int64_t)(pRsp->info.ahandle);
int32_t transId = (int32_t)(signature >> 32);
......@@ -827,6 +854,7 @@ void mndTransProcessRsp(SRpcMsg *pRsp) {
_OVER:
mndReleaseTrans(pMnode, pTrans);
return 0;
}
static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
......@@ -899,10 +927,15 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
}
static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
if (pAction->actionType) {
if (pAction->actionType == TRANS_ACTION_RAW) {
return mndTransWriteSingleLog(pMnode, pTrans, pAction);
} else {
} else if (pAction->actionType == TRANS_ACTION_MSG) {
return mndTransSendSingleMsg(pMnode, pTrans, pAction);
} else {
pAction->rawWritten = 0;
pAction->errCode = 0;
mDebug("trans:%d, %s:%d null action executed", pTrans->id, mndTransStr(pAction->stage), pAction->id);
return 0;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册