diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 157e0cb721637c342001cb646367f8027a000ebe..3e2f5967840805cde8b9a37d4e437900965f42e3 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -58,6 +58,9 @@ typedef struct { void *pNode; } SNodeMsg; +typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *); +typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey); + typedef struct SRpcInit { uint16_t localPort; // local port char * label; // for debug purpose @@ -74,10 +77,10 @@ typedef struct SRpcInit { char *ckey; // ciphering key // call back to process incoming msg, code shall be ignored by server app - void (*cfp)(void *parent, SRpcMsg *, SEpSet *); + RpcCfp cfp; // call back to retrieve the client auth info, for server app only - int (*afp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey); + RpcAfp afp;; void *parent; } SRpcInit; diff --git a/source/dnode/mgmt/bm/src/bmWorker.c b/source/dnode/mgmt/bm/src/bmWorker.c index 932c008e34cf58614824a43e1f334d5f21d0e5c3..a5a97f6af0c25febcc3eea44544381228c3bc215 100644 --- a/source/dnode/mgmt/bm/src/bmWorker.c +++ b/source/dnode/mgmt/bm/src/bmWorker.c @@ -18,7 +18,7 @@ static void bmSendErrorRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code}; - dndSendRsp(pWrapper, &rpcRsp); + tmsgSendRsp(&rpcRsp); dTrace("msg:%p, is freed", pMsg); rpcFreeCont(pMsg->rpcMsg.pCont); diff --git a/source/dnode/mgmt/main/inc/dnd.h b/source/dnode/mgmt/main/inc/dnd.h index f3a409f2575d40ab51cfacfa432a5c0afe153540..cf33787108ec453b78165417fbcb7ef8d2a56ada 100644 --- a/source/dnode/mgmt/main/inc/dnd.h +++ b/source/dnode/mgmt/main/inc/dnd.h @@ -166,8 +166,6 @@ int32_t dndInitClient(SDnode *pDnode); void dndCleanupClient(SDnode *pDnode); int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pMsg); -void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); -void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper); #ifdef __cplusplus diff --git a/source/dnode/mgmt/main/src/dndExec.c b/source/dnode/mgmt/main/src/dndExec.c index 8837da6d48fc2d995f8045dd070311fc34531e0f..3bbdb1a686af650b63275ed58f5ca939927b2c30 100644 --- a/source/dnode/mgmt/main/src/dndExec.c +++ b/source/dnode/mgmt/main/src/dndExec.c @@ -78,7 +78,7 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); if (pRpc->msgType & 1U) { SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; - dndSendRsp(pWrapper, &rsp); + tmsgSendRsp(&rsp); } dTrace("msg:%p, is freed", pMsg); diff --git a/source/dnode/mgmt/main/src/dndMsg.c b/source/dnode/mgmt/main/src/dndMsg.c index bb0f8ccb895e0e2e0bd30c1d6ba748b86d1d885c..76b66467bf4f81d0c01155170a93c4e58c177743 100644 --- a/source/dnode/mgmt/main/src/dndMsg.c +++ b/source/dnode/mgmt/main/src/dndMsg.c @@ -86,7 +86,7 @@ _OVER: dError("msg:%p, failed to process since 0x%04x:%s", pMsg, code & 0XFFFF, terrstr()); if (pRpc->msgType & 1U) { SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; - dndSendRsp(pWrapper, &rsp); + tmsgSendRsp(&rsp); } dTrace("msg:%p, is freed", pMsg); taosFreeQitem(pMsg); diff --git a/source/dnode/mgmt/main/src/dndTransport.c b/source/dnode/mgmt/main/src/dndTransport.c index d463f2ba7a137252c6c93529bacb4feca0b2c920..f863db2a0ca42c856c3d74f7edb991f8e9c112bd 100644 --- a/source/dnode/mgmt/main/src/dndTransport.c +++ b/source/dnode/mgmt/main/src/dndTransport.c @@ -36,8 +36,7 @@ static inline void dndProcessQMVnodeRpcMsg(SMsgHandle *pHandle, SRpcMsg *pMsg, S dndProcessRpcMsg(pWrapper, pMsg, pEpSet); } -static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { - SDnode *pDnode = parent; +static void dndProcessResponse(SDnode *pDnode, SRpcMsg *pRsp, SEpSet *pEpSet) { STransMgmt *pMgmt = &pDnode->trans; tmsg_t msgType = pRsp->msgType; @@ -69,7 +68,7 @@ int32_t dndInitClient(SDnode *pDnode) { memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.label = "DND"; rpcInit.numOfThreads = 1; - rpcInit.cfp = dndProcessResponse; + rpcInit.cfp = (RpcCfp)dndProcessResponse; rpcInit.sessions = 1024; rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.idleTime = tsShellActivityTimer * 1000; @@ -101,8 +100,7 @@ void dndCleanupClient(SDnode *pDnode) { } } -static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { - SDnode *pDnode = param; +static void dndProcessRequest(SDnode *pDnode, SRpcMsg *pReq, SEpSet *pEpSet) { STransMgmt *pMgmt = &pDnode->trans; tmsg_t msgType = pReq->msgType; @@ -179,10 +177,8 @@ static int32_t dndGetHideUserAuth(SDnode *pDnode, char *user, char *spi, char *e return code; } -static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char *encrypt, char *secret, char *ckey) { - SDnode *pDnode = parent; - - if (dndGetHideUserAuth(parent, user, spi, encrypt, secret, ckey) == 0) { +static int32_t dndRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { + if (dndGetHideUserAuth(pDnode, user, spi, encrypt, secret, ckey) == 0) { dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt); return 0; } @@ -244,11 +240,11 @@ int32_t dndInitServer(SDnode *pDnode) { rpcInit.localPort = pDnode->serverPort; rpcInit.label = "DND"; rpcInit.numOfThreads = numOfThreads; - rpcInit.cfp = dndProcessRequest; + rpcInit.cfp = (RpcCfp)dndProcessRequest; rpcInit.sessions = tsMaxShellConns; rpcInit.connType = TAOS_CONN_SERVER; rpcInit.idleTime = tsShellActivityTimer * 1000; - rpcInit.afp = dndRetrieveUserAuthInfo; + rpcInit.afp = (RpcAfp)dndRetrieveUserAuthInfo; rpcInit.parent = pDnode; pMgmt->serverRpc = rpcOpen(&rpcInit); @@ -363,6 +359,7 @@ int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + memcpy(pHead, pReq, sizeof(SRpcMsg)); memcpy(pHead + sizeof(SRpcMsg), pEpSet, sizeof(SEpSet)); @@ -372,7 +369,7 @@ int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) } } -void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { +static void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { if (pWrapper->procType != PROC_CHILD) { dndSendRpcRsp(pWrapper, pRsp); } else { @@ -380,7 +377,7 @@ void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { } } -void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { +static void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { if (pWrapper->procType != PROC_CHILD) { rpcRegisterBrokenLinkArg(pMsg); } else { @@ -406,4 +403,4 @@ SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper) { .releaseHandleFp = dndReleaseHandle, }; return msgCb; -} \ No newline at end of file +} diff --git a/source/dnode/mgmt/mm/src/mmWorker.c b/source/dnode/mgmt/mm/src/mmWorker.c index c4aafe05e4dcb69121159e31235711e6fb8dab45..cde4eb853a3508e668180b049fe90a6b18bde79c 100644 --- a/source/dnode/mgmt/mm/src/mmWorker.c +++ b/source/dnode/mgmt/mm/src/mmWorker.c @@ -37,7 +37,7 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { dError("msg:%p, failed to process since %s", pMsg, terrstr()); } SRpcMsg rsp = {.handle = pRpc->handle, .code = code, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp}; - dndSendRsp(pMgmt->pWrapper, &rsp); + tmsgSendRsp(&rsp); } } @@ -60,7 +60,7 @@ static void mmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { if (pRpc->handle != NULL && code != 0) { dError("msg:%p, failed to process since %s", pMsg, terrstr()); SRpcMsg rsp = {.handle = pRpc->handle, .code = code, .ahandle = pRpc->ahandle}; - dndSendRsp(pMgmt->pWrapper, &rsp); + tmsgSendRsp(&rsp); } } diff --git a/source/dnode/mgmt/qm/src/qmWorker.c b/source/dnode/mgmt/qm/src/qmWorker.c index 14efb311b17681357a5547d6ff8c44245331340b..5ceb9dbf88f427472d68c021bc7f3be099dfad7e 100644 --- a/source/dnode/mgmt/qm/src/qmWorker.c +++ b/source/dnode/mgmt/qm/src/qmWorker.c @@ -18,7 +18,7 @@ static void qmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code}; - dndSendRsp(pWrapper, &rsp); + tmsgSendRsp(&rsp); } static void qmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { diff --git a/source/dnode/mgmt/vm/src/vmWorker.c b/source/dnode/mgmt/vm/src/vmWorker.c index 4be6311cf8d597cdf2303c11298b83e2e842cf70..c228e6e7ddc6c4e11d5ecfc27e87367b03af469e 100644 --- a/source/dnode/mgmt/vm/src/vmWorker.c +++ b/source/dnode/mgmt/vm/src/vmWorker.c @@ -18,7 +18,7 @@ static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code}; - dndSendRsp(pWrapper, &rsp); + tmsgSendRsp(&rsp); } static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { @@ -116,7 +116,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO int32_t code = vnodeApplyWMsg(pVnode->pImpl, pRpc, &pRsp); if (pRsp != NULL) { pRsp->ahandle = pRpc->ahandle; - dndSendRsp(pVnode->pWrapper, pRsp); + tmsgSendRsp(pRsp); taosMemoryFree(pRsp); } else { if (code != 0 && terrno != 0) code = terrno;