提交 20f1e3f5 编写于 作者: S Shengliang Guan

refact dnode

上级 ef0cfbe4
...@@ -58,6 +58,9 @@ typedef struct { ...@@ -58,6 +58,9 @@ typedef struct {
void *pNode; void *pNode;
} SNodeMsg; } SNodeMsg;
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *);
typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
typedef struct SRpcInit { typedef struct SRpcInit {
uint16_t localPort; // local port uint16_t localPort; // local port
char * label; // for debug purpose char * label; // for debug purpose
...@@ -74,10 +77,10 @@ typedef struct SRpcInit { ...@@ -74,10 +77,10 @@ typedef struct SRpcInit {
char *ckey; // ciphering key char *ckey; // ciphering key
// call back to process incoming msg, code shall be ignored by server app // call back to process incoming msg, code shall be ignored by server app
void (*cfp)(void *parent, SRpcMsg *, SEpSet *); RpcCfp cfp;
// call back to retrieve the client auth info, for server app only // call back to retrieve the client auth info, for server app only
int (*afp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey); RpcAfp afp;;
void *parent; void *parent;
} SRpcInit; } SRpcInit;
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
static void bmSendErrorRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { static void bmSendErrorRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code}; SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code};
dndSendRsp(pWrapper, &rpcRsp); tmsgSendRsp(&rpcRsp);
dTrace("msg:%p, is freed", pMsg); dTrace("msg:%p, is freed", pMsg);
rpcFreeCont(pMsg->rpcMsg.pCont); rpcFreeCont(pMsg->rpcMsg.pCont);
......
...@@ -166,8 +166,6 @@ int32_t dndInitClient(SDnode *pDnode); ...@@ -166,8 +166,6 @@ int32_t dndInitClient(SDnode *pDnode);
void dndCleanupClient(SDnode *pDnode); void dndCleanupClient(SDnode *pDnode);
int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg); int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg);
void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp);
void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper); SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -78,7 +78,7 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t ...@@ -78,7 +78,7 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t
dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
if (pRpc->msgType & 1U) { if (pRpc->msgType & 1U) {
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
dndSendRsp(pWrapper, &rsp); tmsgSendRsp(&rsp);
} }
dTrace("msg:%p, is freed", pMsg); dTrace("msg:%p, is freed", pMsg);
......
...@@ -86,7 +86,7 @@ _OVER: ...@@ -86,7 +86,7 @@ _OVER:
dError("msg:%p, failed to process since 0x%04x:%s", pMsg, code & 0XFFFF, terrstr()); dError("msg:%p, failed to process since 0x%04x:%s", pMsg, code & 0XFFFF, terrstr());
if (pRpc->msgType & 1U) { if (pRpc->msgType & 1U) {
SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno};
dndSendRsp(pWrapper, &rsp); tmsgSendRsp(&rsp);
} }
dTrace("msg:%p, is freed", pMsg); dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
......
...@@ -36,8 +36,7 @@ static inline void dndProcessQMVnodeRpcMsg(SMsgHandle *pHandle, SRpcMsg *pMsg, S ...@@ -36,8 +36,7 @@ static inline void dndProcessQMVnodeRpcMsg(SMsgHandle *pHandle, SRpcMsg *pMsg, S
dndProcessRpcMsg(pWrapper, pMsg, pEpSet); dndProcessRpcMsg(pWrapper, pMsg, pEpSet);
} }
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { static void dndProcessResponse(SDnode *pDnode, SRpcMsg *pRsp, SEpSet *pEpSet) {
SDnode *pDnode = parent;
STransMgmt *pMgmt = &pDnode->trans; STransMgmt *pMgmt = &pDnode->trans;
tmsg_t msgType = pRsp->msgType; tmsg_t msgType = pRsp->msgType;
...@@ -69,7 +68,7 @@ int32_t dndInitClient(SDnode *pDnode) { ...@@ -69,7 +68,7 @@ int32_t dndInitClient(SDnode *pDnode) {
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.label = "DND"; rpcInit.label = "DND";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = dndProcessResponse; rpcInit.cfp = (RpcCfp)dndProcessResponse;
rpcInit.sessions = 1024; rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
...@@ -101,8 +100,7 @@ void dndCleanupClient(SDnode *pDnode) { ...@@ -101,8 +100,7 @@ void dndCleanupClient(SDnode *pDnode) {
} }
} }
static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { static void dndProcessRequest(SDnode *pDnode, SRpcMsg *pReq, SEpSet *pEpSet) {
SDnode *pDnode = param;
STransMgmt *pMgmt = &pDnode->trans; STransMgmt *pMgmt = &pDnode->trans;
tmsg_t msgType = pReq->msgType; tmsg_t msgType = pReq->msgType;
...@@ -179,10 +177,8 @@ static int32_t dndGetHideUserAuth(SDnode *pDnode, char *user, char *spi, char *e ...@@ -179,10 +177,8 @@ static int32_t dndGetHideUserAuth(SDnode *pDnode, char *user, char *spi, char *e
return code; return code;
} }
static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey) { static int32_t dndRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) {
SDnode *pDnode = parent; if (dndGetHideUserAuth(pDnode, user, spi, encrypt, secret, ckey) == 0) {
if (dndGetHideUserAuth(parent, user, spi, encrypt, secret, ckey) == 0) {
dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt); dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt);
return 0; return 0;
} }
...@@ -244,11 +240,11 @@ int32_t dndInitServer(SDnode *pDnode) { ...@@ -244,11 +240,11 @@ int32_t dndInitServer(SDnode *pDnode) {
rpcInit.localPort = pDnode->serverPort; rpcInit.localPort = pDnode->serverPort;
rpcInit.label = "DND"; rpcInit.label = "DND";
rpcInit.numOfThreads = numOfThreads; rpcInit.numOfThreads = numOfThreads;
rpcInit.cfp = dndProcessRequest; rpcInit.cfp = (RpcCfp)dndProcessRequest;
rpcInit.sessions = tsMaxShellConns; rpcInit.sessions = tsMaxShellConns;
rpcInit.connType = TAOS_CONN_SERVER; rpcInit.connType = TAOS_CONN_SERVER;
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.afp = dndRetrieveUserAuthInfo; rpcInit.afp = (RpcAfp)dndRetrieveUserAuthInfo;
rpcInit.parent = pDnode; rpcInit.parent = pDnode;
pMgmt->serverRpc = rpcOpen(&rpcInit); pMgmt->serverRpc = rpcOpen(&rpcInit);
...@@ -363,6 +359,7 @@ int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) ...@@ -363,6 +359,7 @@ int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq)
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
memcpy(pHead, pReq, sizeof(SRpcMsg)); memcpy(pHead, pReq, sizeof(SRpcMsg));
memcpy(pHead + sizeof(SRpcMsg), pEpSet, sizeof(SEpSet)); memcpy(pHead + sizeof(SRpcMsg), pEpSet, sizeof(SEpSet));
...@@ -372,7 +369,7 @@ int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) ...@@ -372,7 +369,7 @@ int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq)
} }
} }
void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { static void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
if (pWrapper->procType != PROC_CHILD) { if (pWrapper->procType != PROC_CHILD) {
dndSendRpcRsp(pWrapper, pRsp); dndSendRpcRsp(pWrapper, pRsp);
} else { } else {
...@@ -380,7 +377,7 @@ void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { ...@@ -380,7 +377,7 @@ void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) {
} }
} }
void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { static void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
if (pWrapper->procType != PROC_CHILD) { if (pWrapper->procType != PROC_CHILD) {
rpcRegisterBrokenLinkArg(pMsg); rpcRegisterBrokenLinkArg(pMsg);
} else { } else {
...@@ -406,4 +403,4 @@ SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper) { ...@@ -406,4 +403,4 @@ SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper) {
.releaseHandleFp = dndReleaseHandle, .releaseHandleFp = dndReleaseHandle,
}; };
return msgCb; return msgCb;
} }
\ No newline at end of file
...@@ -37,7 +37,7 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { ...@@ -37,7 +37,7 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
dError("msg:%p, failed to process since %s", pMsg, terrstr()); dError("msg:%p, failed to process since %s", pMsg, terrstr());
} }
SRpcMsg rsp = {.handle = pRpc->handle, .code = code, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp}; SRpcMsg rsp = {.handle = pRpc->handle, .code = code, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp};
dndSendRsp(pMgmt->pWrapper, &rsp); tmsgSendRsp(&rsp);
} }
} }
...@@ -60,7 +60,7 @@ static void mmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { ...@@ -60,7 +60,7 @@ static void mmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
if (pRpc->handle != NULL && code != 0) { if (pRpc->handle != NULL && code != 0) {
dError("msg:%p, failed to process since %s", pMsg, terrstr()); dError("msg:%p, failed to process since %s", pMsg, terrstr());
SRpcMsg rsp = {.handle = pRpc->handle, .code = code, .ahandle = pRpc->ahandle}; SRpcMsg rsp = {.handle = pRpc->handle, .code = code, .ahandle = pRpc->ahandle};
dndSendRsp(pMgmt->pWrapper, &rsp); tmsgSendRsp(&rsp);
} }
} }
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
static void qmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { static void qmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code}; SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code};
dndSendRsp(pWrapper, &rsp); tmsgSendRsp(&rsp);
} }
static void qmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { static void qmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code}; SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code};
dndSendRsp(pWrapper, &rsp); tmsgSendRsp(&rsp);
} }
static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
...@@ -116,7 +116,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -116,7 +116,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
int32_t code = vnodeApplyWMsg(pVnode->pImpl, pRpc, &pRsp); int32_t code = vnodeApplyWMsg(pVnode->pImpl, pRpc, &pRsp);
if (pRsp != NULL) { if (pRsp != NULL) {
pRsp->ahandle = pRpc->ahandle; pRsp->ahandle = pRpc->ahandle;
dndSendRsp(pVnode->pWrapper, pRsp); tmsgSendRsp(pRsp);
taosMemoryFree(pRsp); taosMemoryFree(pRsp);
} else { } else {
if (code != 0 && terrno != 0) code = terrno; if (code != 0 && terrno != 0) code = terrno;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册