diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index b86b95b8585350da9777f2045fa503a3fb81d61e..414d37d8b8c3141784a22bb117a8ec6d92b4096a 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -323,7 +323,7 @@ void *rpcMallocCont(int contLen) { tError("failed to malloc msg, size:%d", size); return NULL; } else { - tDebug("malloc msg: %p", start); + tTrace("malloc mem: %p", start); } return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); @@ -333,7 +333,7 @@ void rpcFreeCont(void *cont) { if (cont) { char *temp = ((char *)cont) - sizeof(SRpcHead) - sizeof(SRpcReqContext); free(temp); - tDebug("free mem: %p", temp); + tTrace("free mem: %p", temp); } } @@ -553,7 +553,7 @@ static void rpcFreeMsg(void *msg) { if ( msg ) { char *temp = (char *)msg - sizeof(SRpcReqContext); free(temp); - tDebug("free msg: %p", temp); + tTrace("free mem: %p", temp); } } @@ -819,9 +819,18 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { return TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED; } + if (rpcContLenFromMsg(pHead->msgLen) <= 0) { + tDebug("%s, message body is empty, ignore", pConn->info); + return TSDB_CODE_RPC_APP_ERROR; + } + pConn->inTranId = pHead->tranId; pConn->inType = pHead->msgType; + // start the progress timer to monitor the response from server app + if (pConn->connType != RPC_CONN_TCPS) + pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pConn->pRpc->tmrCtrl); + return 0; } @@ -960,11 +969,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl); } else { terrno = rpcProcessRspHead(pConn, pHead); - if (terrno == 0) { - SRpcReqContext *pContext = pConn->pContext; - *ppContext = pContext; - pConn->pContext = NULL; - } + *ppContext = pConn->pContext; } } @@ -1094,20 +1099,11 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte if ( rpcIsReq(pHead->msgType) ) { rpcMsg.ahandle = pConn->ahandle; - if (rpcMsg.contLen > 0) { - rpcMsg.handle = pConn; - rpcAddRef(pRpc); // add the refCount for requests + rpcMsg.handle = pConn; + rpcAddRef(pRpc); // add the refCount for requests - // start the progress timer to monitor the response from server app - if (pConn->connType != RPC_CONN_TCPS) - pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl); - - // notify the server app - (*(pRpc->cfp))(&rpcMsg, NULL); - } else { - tDebug("%s, message body is empty, ignore", pConn->info); - rpcFreeCont(rpcMsg.pCont); - } + // notify the server app + (*(pRpc->cfp))(&rpcMsg, NULL); } else { // it's a response rpcMsg.handle = pContext; @@ -1451,7 +1447,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { pNewHead->msgLen = rpcMsgLenFromCont(origLen); rpcFreeMsg(pHead); // free the compressed message buffer pHead = pNewHead; - //tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen); + tTrace("decomp malloc mem: %p", temp); } else { tError("failed to allocate memory to decompress msg, contLen:%d", contLen); } diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 20b03b34e351ac6430ed36643261ff062e4b6091..dd9e7684e03e5cfc8c44dc3555a4ad1d144b90b6 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -438,7 +438,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen); return -1; } else { - tDebug("TCP malloc mem: %p", buffer); + tTrace("TCP malloc mem: %p", buffer); } msg = buffer + tsRpcOverhead; diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 4ea47582b9c93fbd483454a50e1bf6aed5237870..6f653046615f162c516b5eebf08995d30c6214d7 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -214,7 +214,7 @@ static void *taosRecvUdpData(void *param) { tError("%s failed to allocate memory, size:%" PRId64, pConn->label, (int64_t)dataLen); continue; } else { - tDebug("UDP malloc mem: %p", tmsg); + tTrace("UDP malloc mem: %p", tmsg); } tmsg += tsRpcOverhead; // overhead for SRpcReqContext