diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 6a5d3b079a804a91444e845a9e0aab024d2b2ad9..f0b8c996c5aec747e69785c469946f80856e05e5 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -195,7 +195,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); static void rpcSendReqHead(SRpcConn *pConn); static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv); -static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext); +static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead); static void rpcProcessConnError(void *param, void *id); static void rpcProcessRetryTimer(void *, void *); static void rpcProcessIdleTimer(void *param, void *tmrId); @@ -885,14 +885,13 @@ static int rpcProcessRspHead(SRpcConn *pConn, SRpcHead *pHead) { return TSDB_CODE_SUCCESS; } -static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqContext **ppContext) { +static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) { int32_t sid; SRpcConn *pConn = NULL; SRpcHead *pHead = (SRpcHead *)pRecv->msg; sid = htonl(pHead->destId); - *ppContext = NULL; if (pHead->msgType >= TSDB_MSG_TYPE_MAX || pHead->msgType <= 0) { tDebug("%s sid:%d, invalid message type:%d", pRpc->label, sid, pHead->msgType); @@ -946,17 +945,6 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl); } else { terrno = rpcProcessRspHead(pConn, pHead); - if (terrno == 0) { - SRpcReqContext *pContext = pConn->pContext; - *ppContext = pContext; - pConn->pContext = NULL; - pConn->pReqMsg = NULL; - - // for UDP, port may be changed by server, the port in epSet shall be used for cache - if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { - rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); - } - } } } @@ -1021,8 +1009,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { } terrno = 0; - SRpcReqContext *pContext; - pConn = rpcProcessMsgHead(pRpc, pRecv, &pContext); + pConn = rpcProcessMsgHead(pRpc, pRecv); if (pHead->msgType >= 1 && pHead->msgType < TSDB_MSG_TYPE_MAX) { tDebug("%s %p %p, %s received from 0x%x:%hu, parse code:0x%x len:%d sig:0x%08x:0x%08x:%d code:0x%x", pRpc->label, @@ -1042,7 +1029,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { tDebug("%s %p %p, %s is sent with error code:0x%x", pRpc->label, pConn, (void *)pHead->ahandle, taosMsg[pHead->msgType+1], code); } } else { // msg is passed to app only parsing is ok - rpcProcessIncomingMsg(pConn, pHead, pContext); + rpcProcessIncomingMsg(pConn, pHead); } } @@ -1073,7 +1060,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) { rpcFreeCont(pContext->pCont); } -static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) { +static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { SRpcInfo *pRpc = pConn->pRpc; SRpcMsg rpcMsg; @@ -1102,10 +1089,15 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte } } else { // it's a response + SRpcReqContext *pContext = pConn->pContext; rpcMsg.handle = pContext; + pConn->pContext = NULL; + pConn->pReqMsg = NULL; // for UDP, port may be changed by server, the port in epSet shall be used for cache - if (pHead->code == TSDB_CODE_RPC_TOO_SLOW) { + if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { + rpcAddConnIntoCache(pRpc->pCache, pConn, pConn->peerFqdn, pContext->epSet.port[pContext->epSet.inUse], pConn->connType); + } else { rpcCloseConn(pConn); }