提交 6f0f7c58 编写于 作者: S Shengliang Guan

TD-10431 build create vnode msg

上级 8bba00f2
......@@ -30,8 +30,8 @@ void mndTransDrop(STrans *pTrans);
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, SRpcMsg *pMsg);
int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, SRpcMsg *pMsg);
int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont);
int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code);
char *mndTransStageStr(ETrnStage stage);
......
......@@ -303,9 +303,8 @@ static int32_t mndSetRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SV
return -1;
}
SRpcMsg rpcMsg = {.msgType = TSDB_MSG_TYPE_ALTER_VNODE_IN, .pCont = pMsg, .contLen = sizeof(SCreateVnodeMsg)};
if (mndTransAppendRedoAction(pTrans, &epset, &rpcMsg) != 0) {
rpcFreeCont(pMsg);
if (mndTransAppendRedoAction(pTrans, &epset, TSDB_MSG_TYPE_ALTER_VNODE_IN, sizeof(SCreateVnodeMsg), pMsg) != 0) {
free(pMsg);
return -1;
}
}
......@@ -333,9 +332,8 @@ static int32_t mndSetUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SV
return -1;
}
SRpcMsg rpcMsg = {.msgType = TSDB_MSG_TYPE_DROP_VNODE_IN, .pCont = pMsg, .contLen = sizeof(SDropVnodeMsg)};
if (mndTransAppendUndoAction(pTrans, &epset, &rpcMsg) != 0) {
rpcFreeCont(pMsg);
if (mndTransAppendUndoAction(pTrans, &epset, TSDB_MSG_TYPE_DROP_VNODE_IN, sizeof(SDropVnodeMsg), pMsg) != 0) {
free(pMsg);
return -1;
}
}
......
......@@ -23,7 +23,9 @@
typedef struct {
SEpSet epSet;
SRpcMsg msg;
int8_t msgType;
int32_t contLen;
void *pCont;
} STransAction;
static SSdbRaw *mndTransActionEncode(STrans *pTrans);
......@@ -35,10 +37,11 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans);
static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle);
static void mndTransSendRpcRsp(STrans *pTrans, int32_t code);
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw);
static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, SRpcMsg *pMsg);
static void mndTransDropLog(SArray *pArray);
static void mndTransDropAction(SArray *pArray);
static int32_t mndTransExecuteArray(SMnode *pMnode, SArray *pArray);
static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont);
static void mndTransDropLogs(SArray *pArray);
static void mndTransDropActions(SArray *pArray);
static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray);
static int32_t mndTransExecuteActions(SMnode *pMnode, SArray *pArray);
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans);
......@@ -89,12 +92,12 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
for (int32_t i = 0; i < redoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
rawDataLen += (sizeof(STransAction) + pAction->msg.contLen);
rawDataLen += (sizeof(STransAction) + pAction->contLen);
}
for (int32_t i = 0; i < undoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
rawDataLen += (sizeof(STransAction) + pAction->msg.contLen);
rawDataLen += (sizeof(STransAction) + pAction->contLen);
}
SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, TSDB_TRANS_VER, rawDataLen);
......@@ -136,17 +139,17 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
for (int32_t i = 0; i < redoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet));
SDB_SET_INT8(pRaw, dataPos, pAction->msg.msgType)
SDB_SET_INT32(pRaw, dataPos, pAction->msg.contLen)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->msg.pCont, pAction->msg.contLen);
SDB_SET_INT8(pRaw, dataPos, pAction->msgType)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen);
}
for (int32_t i = 0; i < undoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet));
SDB_SET_INT8(pRaw, dataPos, pAction->msg.msgType)
SDB_SET_INT32(pRaw, dataPos, pAction->msg.contLen)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->msg.pCont, pAction->msg.contLen);
SDB_SET_INT8(pRaw, dataPos, pAction->msgType)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen);
}
SDB_SET_RESERVE(pRaw, dataPos, TSDB_TRN_RESERVE_SIZE)
......@@ -247,14 +250,14 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
for (int32_t i = 0; i < redoActionNum; ++i) {
STransAction action = {0};
SDB_GET_BINARY(pRaw, pRow, dataPos, (void *)&action.epSet, sizeof(SEpSet));
SDB_GET_INT8(pRaw, pRow, dataPos, &action.msg.msgType)
SDB_GET_INT32(pRaw, pRow, dataPos, &action.msg.contLen)
action.msg.pCont = rpcMallocCont(action.msg.contLen);
if (action.msg.pCont == NULL) {
SDB_GET_INT8(pRaw, pRow, dataPos, &action.msgType)
SDB_GET_INT32(pRaw, pRow, dataPos, &action.contLen)
action.pCont = malloc(action.contLen);
if (action.pCont == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto TRANS_DECODE_OVER;
}
SDB_GET_BINARY(pRaw, pRow, dataPos, action.msg.pCont, action.msg.contLen);
SDB_GET_BINARY(pRaw, pRow, dataPos, action.pCont, action.contLen);
void *ret = taosArrayPush(pTrans->redoActions, &action);
if (ret == NULL) {
......@@ -266,14 +269,14 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
for (int32_t i = 0; i < undoActionNum; ++i) {
STransAction action = {0};
SDB_GET_BINARY(pRaw, pRow, dataPos, (void *)&action.epSet, sizeof(SEpSet));
SDB_GET_INT8(pRaw, pRow, dataPos, &action.msg.msgType)
SDB_GET_INT32(pRaw, pRow, dataPos, &action.msg.contLen)
action.msg.pCont = rpcMallocCont(action.msg.contLen);
if (action.msg.pCont == NULL) {
SDB_GET_INT8(pRaw, pRow, dataPos, &action.msgType)
SDB_GET_INT32(pRaw, pRow, dataPos, &action.contLen)
action.pCont = malloc(action.contLen);
if (action.pCont == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto TRANS_DECODE_OVER;
}
SDB_GET_BINARY(pRaw, pRow, dataPos, action.msg.pCont, action.msg.contLen);
SDB_GET_BINARY(pRaw, pRow, dataPos, action.pCont, action.contLen);
void *ret = taosArrayPush(pTrans->undoActions, &action);
if (ret == NULL) {
......@@ -305,11 +308,11 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) {
mTrace("trans:%d, perform delete action, stage:%s", pTrans->id, mndTransStageStr(pTrans->stage));
mndTransDropLog(pTrans->redoLogs);
mndTransDropLog(pTrans->undoLogs);
mndTransDropLog(pTrans->commitLogs);
mndTransDropAction(pTrans->redoActions);
mndTransDropAction(pTrans->undoActions);
mndTransDropLogs(pTrans->redoLogs);
mndTransDropLogs(pTrans->undoLogs);
mndTransDropLogs(pTrans->commitLogs);
mndTransDropActions(pTrans->redoActions);
mndTransDropActions(pTrans->undoActions);
return 0;
}
......@@ -389,7 +392,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
return pTrans;
}
static void mndTransDropLog(SArray *pArray) {
static void mndTransDropLogs(SArray *pArray) {
for (int32_t i = 0; i < pArray->size; ++i) {
SSdbRaw *pRaw = taosArrayGetP(pArray, i);
tfree(pRaw);
......@@ -398,21 +401,21 @@ static void mndTransDropLog(SArray *pArray) {
taosArrayDestroy(pArray);
}
static void mndTransDropAction(SArray *pArray) {
static void mndTransDropActions(SArray *pArray) {
for (int32_t i = 0; i < pArray->size; ++i) {
STransAction *pAction = taosArrayGet(pArray, i);
rpcFreeCont(pAction->msg.pCont);
free(pAction->pCont);
}
taosArrayDestroy(pArray);
}
void mndTransDrop(STrans *pTrans) {
mndTransDropLog(pTrans->redoLogs);
mndTransDropLog(pTrans->undoLogs);
mndTransDropLog(pTrans->commitLogs);
mndTransDropAction(pTrans->redoActions);
mndTransDropAction(pTrans->undoActions);
mndTransDropLogs(pTrans->redoLogs);
mndTransDropLogs(pTrans->undoLogs);
mndTransDropLogs(pTrans->commitLogs);
mndTransDropActions(pTrans->redoActions);
mndTransDropActions(pTrans->undoActions);
mDebug("trans:%d, data:%p is dropped", pTrans->id, pTrans);
tfree(pTrans);
......@@ -456,8 +459,8 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
return code;
}
static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, SRpcMsg *pMsg) {
STransAction action = {.epSet = *pEpSet, .msg = *pMsg};
static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont) {
STransAction action = {.epSet = *pEpSet, .msgType = msgType, .contLen = contLen, .pCont = pCont};
void *ptr = taosArrayPush(pArray, &action);
if (ptr == NULL) {
......@@ -468,15 +471,15 @@ static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, SRpcMsg *pMs
return 0;
}
int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, SRpcMsg *pMsg) {
int32_t code = mndTransAppendAction(pTrans->redoActions, pEpSet, pMsg);
mTrace("trans:%d, msg:%p append to redo actions", pTrans->id, pMsg);
int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont) {
int32_t code = mndTransAppendAction(pTrans->redoActions, pEpSet, msgType, contLen, pCont);
mTrace("trans:%d, msg:%s len:%d append to redo actions", pTrans->id, taosMsg[msgType], contLen);
return code;
}
int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, SRpcMsg *pMsg) {
int32_t code = mndTransAppendAction(pTrans->undoActions, pEpSet, pMsg);
mTrace("trans:%d, msg:%p append to undo actions", pTrans->id, pMsg);
int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont) {
int32_t code = mndTransAppendAction(pTrans->undoActions, pEpSet, msgType, contLen, pCont);
mTrace("trans:%d, msg:%s len:%d append to undo actions", pTrans->id, taosMsg[msgType], contLen);
return code;
}
......@@ -593,7 +596,7 @@ void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code)
// todo
}
static int32_t mndTransExecuteArray(SMnode *pMnode, SArray *pArray) {
static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) {
SSdb *pSdb = pMnode->pSdb;
int32_t arraySize = taosArrayGetSize(pArray);
......@@ -611,7 +614,7 @@ static int32_t mndTransExecuteArray(SMnode *pMnode, SArray *pArray) {
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
if (taosArrayGetSize(pTrans->redoLogs) != 0) {
code = mndTransExecuteArray(pMnode, pTrans->redoLogs);
code = mndTransExecuteLogs(pMnode, pTrans->redoLogs);
if (code != 0) {
mError("trans:%d, failed to execute redo logs since %s", pTrans->id, terrstr())
} else {
......@@ -625,7 +628,7 @@ static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) {
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
if (taosArrayGetSize(pTrans->undoLogs) != 0) {
code = mndTransExecuteArray(pMnode, pTrans->undoLogs);
code = mndTransExecuteLogs(pMnode, pTrans->undoLogs);
if (code != 0) {
mError("trans:%d, failed to execute undo logs since %s", pTrans->id, terrstr())
} else {
......@@ -639,7 +642,7 @@ static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) {
static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
if (taosArrayGetSize(pTrans->commitLogs) != 0) {
code = mndTransExecuteArray(pMnode, pTrans->commitLogs);
code = mndTransExecuteLogs(pMnode, pTrans->commitLogs);
if (code != 0) {
mError("trans:%d, failed to execute commit logs since %s", pTrans->id, terrstr())
} else {
......@@ -651,36 +654,39 @@ static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) {
}
static int32_t mndTransExecuteActions(SMnode *pMnode, SArray *pArray) {
SSdb *pSdb = pMnode->pSdb;
#if 0
int32_t arraySize = taosArrayGetSize(pArray);
for (int32_t i = 0; i < arraySize; ++i) {
SSdbRaw *pRaw = taosArrayGetP(pArray, i);
int32_t code = sdbWriteNotFree(pSdb, pRaw);
if (code != 0) {
return code;
STransAction *pAction = taosArrayGet(pArray, i);
SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen};
rpcMsg.pCont = rpcMallocCont(pAction->contLen);
if (rpcMsg.pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
mndSendMsgToDnode(pMnode, &pAction->epSet, &rpcMsg);
}
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
#else
return 0;
#endif
}
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
if (taosArrayGetSize(pTrans->redoActions) != 0) {
mTrace("trans:%d, execute redo actions finished", pTrans->id);
}
if (taosArrayGetSize(pTrans->redoActions) <= 0) return 0;
return code;
mTrace("trans:%d, start to execute redo actions", pTrans->id);
return mndTransExecuteActions(pMnode, pTrans->redoActions);
}
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
if (taosArrayGetSize(pTrans->undoActions) != 0) {
mTrace("trans:%d, execute undo actions finished", pTrans->id);
}
if (taosArrayGetSize(pTrans->undoActions) <= 0) return 0;
return code;
mTrace("trans:%d, start to execute undo actions", pTrans->id);
return mndTransExecuteActions(pMnode, pTrans->undoActions);
}
static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
......
......@@ -158,7 +158,7 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
}
SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) {
SCreateVnodeMsg *pCreate = rpcMallocCont(sizeof(SCreateVnodeMsg));
SCreateVnodeMsg *pCreate = malloc(sizeof(SCreateVnodeMsg));
if (pCreate == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
......@@ -192,7 +192,7 @@ SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb
SVnodeGid *pVgid = &pVgroup->vnodeGid[v];
SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pVgidDnode == NULL) {
rpcFreeCont(pCreate);
free(pCreate);
terrno = TSDB_CODE_MND_APP_ERROR;
return NULL;
}
......@@ -208,7 +208,7 @@ SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb
}
if (pCreate->selfIndex == -1) {
rpcFreeCont(pCreate);
free(pCreate);
terrno = TSDB_CODE_MND_APP_ERROR;
return NULL;
}
......@@ -217,7 +217,7 @@ SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb
}
SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) {
SDropVnodeMsg *pDrop = rpcMallocCont(sizeof(SDropVnodeMsg));
SDropVnodeMsg *pDrop = malloc(sizeof(SDropVnodeMsg));
if (pDrop == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册