提交 f9c67813 编写于 作者: S Shengliang Guan

shm

上级 7912d662
......@@ -116,15 +116,6 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
*/
int32_t mndRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, char *secret, char *ckey);
/**
* @brief Initialize mnode msg.
*
* @param pMnode The mnode object.
* @param pMsg The request rpc msg.
* @return int32_t The created mnode msg.
*/
int32_t mndBuildMsg(SMnodeMsg *pMnodeMsg, SRpcMsg *pRpcMsg);
/**
* @brief Cleanup mnode msg.
*
......
......@@ -82,6 +82,7 @@ typedef struct {
SProcObj *pProcess;
bool singleProc;
bool isChild;
bool testFlag;
} SMnodeMgmt;
typedef struct {
......
......@@ -117,8 +117,9 @@ void mmRelease(SDnode *pDnode, SMnode *pMnode) {
int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
pMgmt->singleProc = true;
pMgmt->singleProc = false;
pMgmt->isChild = false;
pMgmt->testFlag = true;
int32_t code = mmOpenImp(pDnode, pOption);
......
......@@ -20,11 +20,11 @@
#include "dndTransport.h"
#include "dndWorker.h"
static int32_t mmProcessWriteMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg);
static int32_t mmProcessSyncMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg);
static int32_t mmProcessReadMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg);
static int32_t mmPutMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMnodeMsg *pMnodeMsg);
static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpcMsg);
static int32_t mmProcessWriteMsg(SDnode *pDnode, SMnodeMsg *pMsg);
static int32_t mmProcessSyncMsg(SDnode *pDnode, SMnodeMsg *pMsg);
static int32_t mmProcessReadMsg(SDnode *pDnode, SMnodeMsg *pMsg);
static int32_t mmPutMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMnodeMsg *pMsg);
static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg);
static void mmConsumeQueue(SDnode *pDnode, SMnodeMsg *pMsg);
int32_t mmStartWorker(SDnode *pDnode) {
......@@ -139,54 +139,76 @@ void mmInitMsgFp(SMnodeMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB_RSP)] = mmProcessWriteMsg;
}
void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) {
static int32_t mndBuildMsg(SMnodeMsg *pMsg, SRpcMsg *pRpc) {
SRpcConnInfo connInfo = {0};
if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
dError("failed to create msg since %s, app:%p RPC:%p", terrstr(), pRpc->ahandle, pRpc->handle);
return -1;
}
memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
pMsg->rpcMsg = *pRpc;
pMsg->createdTime = taosGetTimestampSec();
char *pCont = (char *)pMsg + sizeof(SMnodeMsg);
memcpy(pCont, pRpc->pCont, pRpc->contLen);
pMsg->rpcMsg = *pRpc;
pMsg->rpcMsg.pCont = pCont;
pMsg->createdTime = taosGetTimestampSec();
dTrace("msg:%p, is created, app:%p RPC:%p user:%s", pMsg, pRpc->ahandle, pRpc->handle, pMsg->user);
return 0;
}
void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
SMnodeMgmt *pMgmt = &pDnode->mmgmt;
int32_t code = -1;
SMnodeMsg *pMnodeMsg = NULL;
SMnodeMsg *pMsg = NULL;
MndMsgFp msgFp = pMgmt->msgFp[TMSG_INDEX(pRpcMsg->msgType)];
MndMsgFp msgFp = pMgmt->msgFp[TMSG_INDEX(pRpc->msgType)];
if (msgFp == NULL) {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
goto _OVER;
}
int32_t contLen = sizeof(SMnodeMsg) + pRpcMsg->contLen;
pMnodeMsg = taosAllocateQitem(contLen);
if (pMnodeMsg == NULL) {
int32_t contLen = sizeof(SMnodeMsg) + pRpc->contLen;
pMsg = taosAllocateQitem(contLen);
if (pMsg == NULL) {
goto _OVER;
}
if (mndBuildMsg(pMnodeMsg, pRpcMsg) != 0) {
if (mndBuildMsg(pMsg, pRpc) != 0) {
goto _OVER;
}
if (pMgmt->singleProc) {
code = (*msgFp)(pDnode, pMnodeMsg);
code = (*msgFp)(pDnode, pMsg);
} else {
code = taosProcPushChild(pMgmt->pProcess, pMnodeMsg, contLen);
code = taosProcPushChild(pMgmt->pProcess, pMsg, contLen);
}
_OVER:
if (code == 0) {
if (!pMgmt->singleProc) {
taosFreeQitem(pMnodeMsg);
taosFreeQitem(pMsg);
}
} else {
bool isReq = (pRpcMsg->msgType & 1U);
bool isReq = (pRpc->msgType & 1U);
if (isReq) {
if (terrno == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || terrno == TSDB_CODE_APP_NOT_READY) {
dndSendRedirectRsp(pDnode, pRpcMsg);
dndSendRedirectRsp(pDnode, pRpc);
} else {
SRpcMsg rsp = {.handle = pRpcMsg->handle, .ahandle = pRpcMsg->ahandle, .code = terrno};
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
rpcSendResponse(&rsp);
}
}
taosFreeQitem(pMnodeMsg);
taosFreeQitem(pMsg);
}
rpcFreeCont(pRpcMsg->pCont);
rpcFreeCont(pRpc->pCont);
}
int32_t mmProcessWriteMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg) {
......@@ -261,5 +283,5 @@ static void mmConsumeQueue(SDnode *pDnode, SMnodeMsg *pMsg) {
mndSendRsp(pMsg, terrno);
}
// mndCleanupMsg(pMsg);
taosFreeQitem(pMsg);
}
......@@ -390,28 +390,6 @@ void mndDestroy(const char *path) {
mDebug("mnode is destroyed");
}
int32_t mndBuildMsg(SMnodeMsg *pMnodeMsg, SRpcMsg *pRpcMsg) {
if (pRpcMsg->msgType != TDMT_MND_TRANS_TIMER && pRpcMsg->msgType != TDMT_MND_MQ_TIMER &&
pRpcMsg->msgType != TDMT_MND_MQ_DO_REBALANCE && pRpcMsg->msgType != TDMT_MND_TELEM_TIMER) {
SRpcConnInfo connInfo = {0};
if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
mError("failed to create msg since %s, app:%p RPC:%p", terrstr(), pRpcMsg->ahandle, pRpcMsg->handle);
return -1;
}
memcpy(pMnodeMsg->user, connInfo.user, TSDB_USER_LEN);
}
pMnodeMsg->rpcMsg = *pRpcMsg;
pMnodeMsg->createdTime = taosGetTimestampSec();
pMnodeMsg->pCont = (char*)pMnodeMsg + sizeof(pMnodeMsg);
if (pRpcMsg != NULL) {
mTrace("msg:%p, is created, app:%p RPC:%p user:%s", pMnodeMsg, pRpcMsg->ahandle, pRpcMsg->handle, pMnodeMsg->user);
}
return 0;
}
void mndSendRsp(SMnodeMsg *pMsg, int32_t code) {
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code};
rpcSendResponse(&rpcRsp);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册