提交 71258751 编写于 作者: S Shengliang Guan

shm

上级 27c17bcc
...@@ -115,21 +115,13 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr ...@@ -115,21 +115,13 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
*/ */
int32_t mndRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, char *secret, char *ckey); int32_t mndRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, char *secret, char *ckey);
/**
* @brief Cleanup mnode msg.
*
* @param pMsg The request msg.
* @param code The error code.
*/
void mndSendRsp(SMndMsg *pMsg, int32_t code);
/** /**
* @brief Process the read, write, sync request. * @brief Process the read, write, sync request.
* *
* @param pMsg The request msg. * @param pMsg The request msg.
* @return int32_t 0 for success, -1 for failure. * @return int32_t 0 for success, -1 for failure.
*/ */
void mndProcessMsg(SMndMsg *pMsg); int32_t mndProcessMsg(SMndMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -25,21 +25,21 @@ static int32_t mmProcessSyncMsg(SDnode *pDnode, SMndMsg *pMsg); ...@@ -25,21 +25,21 @@ static int32_t mmProcessSyncMsg(SDnode *pDnode, SMndMsg *pMsg);
static int32_t mmProcessReadMsg(SDnode *pDnode, SMndMsg *pMsg); static int32_t mmProcessReadMsg(SDnode *pDnode, SMndMsg *pMsg);
static int32_t mmPutMndMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMndMsg *pMsg); static int32_t mmPutMndMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMndMsg *pMsg);
static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpc); static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpc);
static void mmConsumeQueue(SDnode *pDnode, SMndMsg *pMsg); static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg);
int32_t mmStartWorker(SDnode *pDnode) { int32_t mmStartWorker(SDnode *pDnode) {
SMndMgmt *pMgmt = &pDnode->mmgmt; SMndMgmt *pMgmt = &pDnode->mmgmt;
if (dndInitWorker(pDnode, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmConsumeQueue) != 0) { if (dndInitWorker(pDnode, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmConsumeMsgQueue) != 0) {
dError("failed to start mnode read worker since %s", terrstr()); dError("failed to start mnode read worker since %s", terrstr());
return -1; return -1;
} }
if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, mmConsumeQueue) != 0) { if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, mmConsumeMsgQueue) != 0) {
dError("failed to start mnode write worker since %s", terrstr()); dError("failed to start mnode write worker since %s", terrstr());
return -1; return -1;
} }
if (dndInitWorker(pDnode, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, mmConsumeQueue) != 0) { if (dndInitWorker(pDnode, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, mmConsumeMsgQueue) != 0) {
dError("failed to start mnode sync worker since %s", terrstr()); dError("failed to start mnode sync worker since %s", terrstr());
return -1; return -1;
} }
...@@ -271,18 +271,36 @@ void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pC ...@@ -271,18 +271,36 @@ void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pC
void mmConsumeParentQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {} void mmConsumeParentQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {}
static void mmConsumeQueue(SDnode *pDnode, SMndMsg *pMsg) { static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg) {
SMndMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = mmAcquire(pDnode);
SRpcMsg *pRpc = &pMsg->rpcMsg;
tmsg_t msgType = pMsg->rpcMsg.msgType;
void *ahandle = pMsg->rpcMsg.ahandle;
bool isReq = (pRpc->msgType & 1U);
int32_t code = -1;
SMnode *pMnode = mmAcquire(pDnode);
if (pMnode != NULL) { if (pMnode != NULL) {
pMsg->pMnode = pMnode; pMsg->pMnode = pMnode;
mndProcessMsg(pMsg); code = mndProcessMsg(pMsg);
mmRelease(pDnode, pMnode); mmRelease(pDnode, pMnode);
} else { }
mndSendRsp(pMsg, terrno);
if (isReq) {
if (pMsg->rpcMsg.handle == NULL) return;
if (code == 0) {
SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont};
rpcSendResponse(&rsp);
} else {
if (terrno == TSDB_CODE_APP_NOT_READY) {
dndSendRedirectRsp(pDnode, pRpc);
} else if (terrno == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
} else {
SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont, .code = terrno};
rpcSendResponse(&rsp);
}
}
} }
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
rpcFreeCont(pMsg->rpcMsg.pCont); rpcFreeCont(pRpc->pCont);
} }
...@@ -128,7 +128,6 @@ typedef struct SMnode { ...@@ -128,7 +128,6 @@ typedef struct SMnode {
int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg); int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg);
int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg); int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg);
void mndSendRedirectRsp(SMnode *pMnode, SRpcMsg *pMsg);
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp); void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp);
uint64_t mndGenerateUid(char *name, int32_t len); uint64_t mndGenerateUid(char *name, int32_t len);
......
...@@ -60,12 +60,6 @@ int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg) { ...@@ -60,12 +60,6 @@ int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg) {
return (*pMnode->sendReqToMnodeFp)(pMnode->pDnode, pMsg); return (*pMnode->sendReqToMnodeFp)(pMnode->pDnode, pMsg);
} }
void mndSendRedirectRsp(SMnode *pMnode, SRpcMsg *pMsg) {
if (pMnode != NULL && pMnode->sendRedirectRspFp != NULL) {
(*pMnode->sendRedirectRspFp)(pMnode->pDnode, pMsg);
}
}
static void *mndBuildTimerMsg(int32_t *pContLen) { static void *mndBuildTimerMsg(int32_t *pContLen) {
SMTimerReq timerReq = {0}; SMTimerReq timerReq = {0};
...@@ -390,65 +384,45 @@ void mndDestroy(const char *path) { ...@@ -390,65 +384,45 @@ void mndDestroy(const char *path) {
mDebug("mnode is destroyed"); mDebug("mnode is destroyed");
} }
void mndSendRsp(SMndMsg *pMsg, int32_t code) { int32_t mndProcessMsg(SMndMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code}; SMnode *pMnode = pMsg->pMnode;
rpcSendResponse(&rpcRsp); SRpcMsg *pRpc = &pMsg->rpcMsg;
} tmsg_t msgType = pMsg->rpcMsg.msgType;
void *ahandle = pMsg->rpcMsg.ahandle;
void mndProcessMsg(SMndMsg *pMsg) { bool isReq = (pRpc->msgType & 1U);
SMnode *pMnode = pMsg->pMnode;
int32_t code = 0;
tmsg_t msgType = pMsg->rpcMsg.msgType;
void *ahandle = pMsg->rpcMsg.ahandle;
bool isReq = (msgType & 1U);
mTrace("msg:%p, type:%s will be processed, app:%p", pMsg, TMSG_INFO(msgType), ahandle); mTrace("msg:%p, type:%s will be processed, app:%p", pMsg, TMSG_INFO(msgType), ahandle);
if (isReq && !mndIsMaster(pMnode)) { if (isReq && !mndIsMaster(pMnode)) {
code = TSDB_CODE_APP_NOT_READY; terrno = TSDB_CODE_APP_NOT_READY;
mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle); mDebug("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
goto PROCESS_RPC_END; return -1;
} }
if (isReq && pMsg->rpcMsg.pCont == NULL) { if (isReq && (pRpc->contLen == 0 || pRpc->pCont == NULL)) {
code = TSDB_CODE_MND_INVALID_MSG_LEN; terrno = TSDB_CODE_MND_INVALID_MSG_LEN;
mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle); mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
goto PROCESS_RPC_END; return -1;
} }
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(msgType)]; MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(msgType)];
if (fp == NULL) { if (fp == NULL) {
code = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
mError("msg:%p, failed to process since no msg handle, app:%p", pMsg, ahandle); mError("msg:%p, failed to process since no msg handle, app:%p", pMsg, ahandle);
goto PROCESS_RPC_END; return -1;
} }
code = (*fp)(pMsg); int32_t code = (*fp)(pMsg);
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) {
terrno = code;
mTrace("msg:%p, in progress, app:%p", pMsg, ahandle); mTrace("msg:%p, in progress, app:%p", pMsg, ahandle);
return;
} else if (code != 0) { } else if (code != 0) {
code = terrno;
mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle); mError("msg:%p, failed to process since %s, app:%p", pMsg, terrstr(), ahandle);
goto PROCESS_RPC_END;
} else { } else {
mTrace("msg:%p, is processed, app:%p", pMsg, ahandle); mTrace("msg:%p, is processed, app:%p", pMsg, ahandle);
} }
PROCESS_RPC_END: return code;
if (isReq) {
if (pMsg->rpcMsg.handle == NULL) return;
if (code == TSDB_CODE_APP_NOT_READY) {
mndSendRedirectRsp(pMnode, &pMsg->rpcMsg);
} else if (code != 0) {
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont, .code = code};
rpcSendResponse(&rpcRsp);
} else {
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont};
rpcSendResponse(&rpcRsp);
}
}
} }
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) { void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册