diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 3dbf666a11039eaccbb516e9bd40a218a99c5a46..4bfb30c03939a142ecdfe5895b1c3f20ba7600d9 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -168,7 +168,7 @@ static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); static void rpcSendErrorMsgToPeer(SRpcInfo *pRpc, char *pMsg, int32_t code, uint32_t ip, uint16_t port, void *chandle); static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); -static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle); +static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, void *chandle); static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead); static void rpcProcessConnError(void *param, void *id); static void rpcProcessRetryTimer(void *, void *); @@ -323,14 +323,15 @@ void rpcSendResponse(void *handle, int32_t code, void *pCont, int contLen) { int msgLen = 0; SRpcConn *pConn = (SRpcConn *)handle; SRpcInfo *pRpc = pConn->pRpc; - SRpcHead *pHead = rpcHeadFromCont(pCont); - char *msg = (char *)pHead; if ( pCont == NULL ) { pCont = rpcMallocCont(0); contLen = 0; } + SRpcHead *pHead = rpcHeadFromCont(pCont); + char *msg = (char *)pHead; + contLen = rpcCompressRpcMsg(pCont, contLen); msgLen = rpcMsgLenFromCont(contLen); @@ -689,17 +690,17 @@ static int32_t rpcProcessHead(SRpcInfo *pRpc, SRpcConn **ppConn, void *data, int return code; } -static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *chandle) { - SRpcHead *pHead = (SRpcHead *)data; +static void *rpcProcessMsgFromPeer(void *msg, int msgLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, void *chandle) { + SRpcHead *pHead = (SRpcHead *)msg; SRpcInfo *pRpc = (SRpcInfo *)shandle; SRpcConn *pConn = NULL; int32_t code = 0; - tDump(data, dataLen); + tDump(msg, msgLen); pthread_mutex_lock(&pRpc->mutex); - code = rpcProcessHead(pRpc, &pConn, data, dataLen, ip); + code = rpcProcessHead(pRpc, &pConn, msg, msgLen, ip); if (pConn) { // update connection info @@ -721,7 +722,7 @@ static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_ if (pHead->msgType < TSDB_MSG_TYPE_HEARTBEAT || (rpcDebugFlag & 16)) { tTrace("%s pConn:%p, %s received from 0x%x:%hu, parse code:%x len:%d source:0x%08x dest:0x%08x tranId:%d", pRpc->label, pConn, taosMsg[pHead->msgType], ip, port, code, - dataLen, pHead->sourceId, pHead->destId, pHead->tranId); + msgLen, pHead->sourceId, pHead->destId, pHead->tranId); } if (pConn && pRpc->idleTime) { @@ -731,7 +732,7 @@ static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_ if (code != TSDB_CODE_ALREADY_PROCESSED) { if (code != 0) { // parsing error if ( rpcIsReq(pHead->msgType) ) { - rpcSendErrorMsgToPeer(pRpc, data, code, ip, port, chandle); + rpcSendErrorMsgToPeer(pRpc, msg, code, ip, port, chandle); tTrace("%s pConn:%p, %s is sent with error code:%x", pRpc->label, pConn, taosMsg[pHead->msgType+1], code); } } else { // parsing OK @@ -739,7 +740,7 @@ static void *rpcProcessMsgFromPeer(void *data, int dataLen, uint32_t ip, uint16_ } } - if ( code != 0 ) free (data); + if ( code != 0 ) free (msg); return pConn; } @@ -766,8 +767,8 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { rpcSendReqToServer(pRpc, pContext); } else { rpcFreeOutMsg(rpcHeadFromCont(pContext->pCont)); // free the request msg - if ( pContext->ipSet.index != pContext->oldIndex || pContext->redirect ) - (*pRpc->ufp)(pContext->ahandle, pContext->ipSet); + if ( pRpc->ufp && (pContext->ipSet.index != pContext->oldIndex || pContext->redirect) ) + (*pRpc->ufp)(pContext->ahandle, pContext->ipSet); // notify the update of ipSet (*pRpc->cfp)(pHead->msgType, pCont, contLen, pContext->ahandle, pContext->ipSet.index); } } diff --git a/src/rpc/test/rclient.c b/src/rpc/test/rclient.c index cce5d0e345bbaee02342ccf5ba54524a95419eb9..166cf10a91ba9bd5fef39e98a9bbe855939217a5 100644 --- a/src/rpc/test/rclient.c +++ b/src/rpc/test/rclient.c @@ -20,7 +20,7 @@ #include void processMsg(char type, void *pCont, int contLen, void *ahandle, int32_t code) { - dPrint("response is received, type:%d, contLen:%d code:%d, ahandle:%p", type, contLen, code, ahandle); + dPrint("response is received, type:%d, contLen:%d code:%x, ahandle:%p", type, contLen, code, ahandle); } int32_t main(int32_t argc, char *argv[]) {