提交 143a3113 编写于 作者: S Shengliang Guan

shm

上级 bd660a5b
......@@ -49,7 +49,6 @@ typedef struct SRpcMsg {
typedef struct {
char user[TSDB_USER_LEN];
SRpcMsg rpcMsg;
SEpSet rpcEpSet;
int32_t rspLen;
void *pRsp;
} SNodeMsg;
......
......@@ -72,15 +72,13 @@ typedef struct SQnodeMgmt SQnodeMgmt;
typedef struct SSnodeMgmt SSnodeMgmt;
typedef struct SBnodeMgmt SBnodeMgmt;
typedef void (*RpcMsgFp)(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEps);
typedef void (*NodeMsgFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
typedef int32_t (*NodeMsgFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
typedef int32_t (*OpenNodeFp)(SMgmtWrapper *pWrapper);
typedef void (*CloseNodeFp)(SMgmtWrapper *pWrapper);
typedef bool (*RequireNodeFp)(SMgmtWrapper *pWrapper);
typedef struct SMsgHandle {
RpcMsgFp rpcMsgFp;
NodeMsgFp nodeMsgFp;
NodeMsgFp msgFp;
SMgmtWrapper *pWrapper;
} SMsgHandle;
......@@ -98,7 +96,7 @@ typedef struct SMgmtWrapper {
SProcObj *pProc;
void *pMgmt;
SDnode *pDnode;
SMsgHandle msgHandles[TDMT_MAX];
NodeMsgFp msgFps[TDMT_MAX];
SMgmtFp fp;
} SMgmtWrapper;
......
......@@ -65,11 +65,7 @@ void dndCleanup() {
SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType) { return &pDnode->wrappers[nodeType]; }
void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) {
SMsgHandle *pHandle = &pWrapper->msgHandles[TMSG_INDEX(msgType)];
pHandle->pWrapper = pWrapper;
pHandle->nodeMsgFp = nodeMsgFp;
pHandle->rpcMsgFp = dndProcessRpcMsg;
pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp;
}
EDndStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; }
......
......@@ -257,8 +257,65 @@ void dndeHandleEvent(SDnode *pDnode, EDndEvent event) {
pDnode->event = event;
}
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet) {
if (pEpSet && pEpSet->numOfEps > 0 && pMsg->msgType == TDMT_MND_STATUS_RSP) {
static int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc, SEpSet *pEpSet) {
SRpcConnInfo connInfo = {0};
if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
dError("failed to build msg since %s, app:%p RPC:%p", terrstr(), pRpc->ahandle, pRpc->handle);
return -1;
}
memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
pMsg->rpcMsg = *pRpc;
return 0;
}
static void dndSendRpcRsp(SDnode *pDnode, SRpcMsg *pRpc) {
if (pRpc->code == TSDB_CODE_APP_NOT_READY) {
dmSendRedirectRsp(pDnode, pRpc);
} else {
rpcSendResponse(pRpc);
}
}
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) {
dmUpdateMnodeEpSet(pWrapper->pDnode, pEpSet);
}
int32_t code = -1;
SNodeMsg *pMsg = NULL;
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
if (msgFp == NULL) {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
goto _OVER;
}
pMsg = taosAllocateQitem(sizeof(SNodeMsg));
if (pMsg == NULL) {
goto _OVER;
}
if (dndBuildMsg(pMsg, pRpc, pEpSet) != 0) {
goto _OVER;
}
dTrace("msg:%p, is created, app:%p RPC:%p user:%s, processd by %s", pMsg, pRpc->ahandle, pRpc->handle, pMsg->user,
pWrapper->name);
code = (*msgFp)(pWrapper, pMsg);
_OVER:
if (code != 0) {
bool isReq = (pRpc->msgType & 1U);
if (isReq) {
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
dndSendRpcRsp(pWrapper->pDnode, &rsp);
}
dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont);
}
}
\ No newline at end of file
......@@ -36,10 +36,10 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
}
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
if (pHandle->rpcMsgFp != NULL) {
if (pHandle->msgFp != NULL) {
dTrace("RPC %p, rsp:%s will be processed by %s, code:0x%x app:%p", pRsp->handle, TMSG_INFO(msgType),
pHandle->pWrapper->name, pRsp->code & 0XFFFF, pRsp->ahandle);
(*pHandle->rpcMsgFp)(pHandle->pWrapper, pRsp, pEpSet);
dndProcessRpcMsg(pHandle->pWrapper, pRsp, pEpSet);
} else {
dError("RPC %p, rsp:%s not processed, app:%p", pRsp->handle, TMSG_INFO(msgType), pRsp->ahandle);
rpcFreeCont(pRsp->pCont);
......@@ -118,10 +118,10 @@ static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
}
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
if (pHandle->rpcMsgFp != NULL) {
if (pHandle->msgFp != NULL) {
dTrace("RPC %p, req:%s will be processed by %s, app:%p", pReq->handle, TMSG_INFO(msgType), pHandle->pWrapper->name,
pReq->ahandle);
(*pHandle->rpcMsgFp)(pHandle->pWrapper, pReq, pEpSet);
dndProcessRpcMsg(pHandle->pWrapper, pReq, pEpSet);
} else {
dError("RPC %p, req:%s not processed since no handle, app:%p", pReq->handle, TMSG_INFO(msgType), pReq->ahandle);
SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pReq->ahandle};
......@@ -253,17 +253,17 @@ int32_t dndInitMsgHandle(SDnode *pDnode) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[nodeType];
for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) {
SMsgHandle msgHandle = pWrapper->msgHandles[msgIndex];
if (msgHandle.rpcMsgFp == NULL) continue;
NodeMsgFp msgFp = pWrapper->msgFps[msgIndex];
if (msgFp == NULL) continue;
SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex];
if (pHandle->rpcMsgFp != NULL) {
if (pHandle->msgFp != NULL) {
dError("msg:%s, has multiple process nodes, prev node:%s, curr node:%s", tMsgInfo[msgIndex],
pHandle->pWrapper->name, pWrapper->name);
return -1;
} else {
dDebug("msg:%s, will be processed by node:%s", tMsgInfo[msgIndex], pWrapper->name);
*pHandle = msgHandle;
pHandle->msgFp = msgFp;
}
}
}
......
......@@ -24,7 +24,7 @@ extern "C" {
int32_t dmStartWorker();
void dmStopWorker();
void dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -161,7 +161,7 @@ void dmStopWorker(SDnodeMgmt *pMgmt) {
}
}
void dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SDnodeMgmt *pMgmt = pWrapper->pMgmt;
SDnodeWorker *pWorker = &pMgmt->mgmtWorker;
......@@ -169,12 +169,5 @@ void dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
pWorker = &pMgmt->statusWorker;
}
if (dndWriteMsgToWorker(pWorker, pMsg, sizeof(SNodeMsg)) != 0) {
if (pMsg->rpcMsg.msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .code = TSDB_CODE_OUT_OF_MEMORY};
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
}
return dndWriteMsgToWorker(pWorker, pMsg, sizeof(SNodeMsg));
}
\ No newline at end of file
......@@ -31,9 +31,9 @@ int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void mmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -268,6 +268,6 @@ static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg) {
#endif
void mmProcessWriteMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {}
void mmProcessSyncMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {}
void mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {}
\ No newline at end of file
int32_t mmProcessWriteMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {return 0;}
int32_t mmProcessSyncMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {return 0;}
int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {return 0;}
\ No newline at end of file
......@@ -31,10 +31,10 @@ int32_t vmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg);
void vmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void vmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen);
void vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void vmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void vmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
void vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t vmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t vmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "vmWorker.h"
void vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg){}
void vmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg){}
void vmProcessQueryMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg){}
void vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg){}
\ No newline at end of file
int32_t vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg){return 0;}
int32_t vmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg){return 0;}
int32_t vmProcessQueryMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg){return 0;}
int32_t vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg){return 0;}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册