未验证 提交 b18cd2d4 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #3760 from taosdata/patch/TD-1632

Patch/td 1632
...@@ -323,7 +323,7 @@ void *rpcMallocCont(int contLen) { ...@@ -323,7 +323,7 @@ void *rpcMallocCont(int contLen) {
tError("failed to malloc msg, size:%d", size); tError("failed to malloc msg, size:%d", size);
return NULL; return NULL;
} else { } else {
tDebug("malloc msg: %p", start); tTrace("malloc mem: %p", start);
} }
return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); return start + sizeof(SRpcReqContext) + sizeof(SRpcHead);
...@@ -333,7 +333,7 @@ void rpcFreeCont(void *cont) { ...@@ -333,7 +333,7 @@ void rpcFreeCont(void *cont) {
if (cont) { if (cont) {
char *temp = ((char *)cont) - sizeof(SRpcHead) - sizeof(SRpcReqContext); char *temp = ((char *)cont) - sizeof(SRpcHead) - sizeof(SRpcReqContext);
free(temp); free(temp);
tDebug("free mem: %p", temp); tTrace("free mem: %p", temp);
} }
} }
...@@ -553,7 +553,7 @@ static void rpcFreeMsg(void *msg) { ...@@ -553,7 +553,7 @@ static void rpcFreeMsg(void *msg) {
if ( msg ) { if ( msg ) {
char *temp = (char *)msg - sizeof(SRpcReqContext); char *temp = (char *)msg - sizeof(SRpcReqContext);
free(temp); free(temp);
tDebug("free msg: %p", temp); tTrace("free mem: %p", temp);
} }
} }
...@@ -819,9 +819,18 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -819,9 +819,18 @@ static int rpcProcessReqHead(SRpcConn *pConn, SRpcHead *pHead) {
return TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED; return TSDB_CODE_RPC_LAST_SESSION_NOT_FINISHED;
} }
if (rpcContLenFromMsg(pHead->msgLen) <= 0) {
tDebug("%s, message body is empty, ignore", pConn->info);
return TSDB_CODE_RPC_APP_ERROR;
}
pConn->inTranId = pHead->tranId; pConn->inTranId = pHead->tranId;
pConn->inType = pHead->msgType; pConn->inType = pHead->msgType;
// start the progress timer to monitor the response from server app
if (pConn->connType != RPC_CONN_TCPS)
pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pConn->pRpc->tmrCtrl);
return 0; return 0;
} }
...@@ -960,11 +969,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont ...@@ -960,11 +969,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv, SRpcReqCont
pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl); pConn->pIdleTimer = taosTmrStart(rpcProcessIdleTimer, tsRpcTimer*2, pConn, pRpc->tmrCtrl);
} else { } else {
terrno = rpcProcessRspHead(pConn, pHead); terrno = rpcProcessRspHead(pConn, pHead);
if (terrno == 0) { *ppContext = pConn->pContext;
SRpcReqContext *pContext = pConn->pContext;
*ppContext = pContext;
pConn->pContext = NULL;
}
} }
} }
...@@ -1094,20 +1099,11 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte ...@@ -1094,20 +1099,11 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
if ( rpcIsReq(pHead->msgType) ) { if ( rpcIsReq(pHead->msgType) ) {
rpcMsg.ahandle = pConn->ahandle; rpcMsg.ahandle = pConn->ahandle;
if (rpcMsg.contLen > 0) {
rpcMsg.handle = pConn; rpcMsg.handle = pConn;
rpcAddRef(pRpc); // add the refCount for requests rpcAddRef(pRpc); // add the refCount for requests
// start the progress timer to monitor the response from server app
if (pConn->connType != RPC_CONN_TCPS)
pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl);
// notify the server app // notify the server app
(*(pRpc->cfp))(&rpcMsg, NULL); (*(pRpc->cfp))(&rpcMsg, NULL);
} else {
tDebug("%s, message body is empty, ignore", pConn->info);
rpcFreeCont(rpcMsg.pCont);
}
} else { } else {
// it's a response // it's a response
rpcMsg.handle = pContext; rpcMsg.handle = pContext;
...@@ -1451,7 +1447,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) { ...@@ -1451,7 +1447,7 @@ static SRpcHead *rpcDecompressRpcMsg(SRpcHead *pHead) {
pNewHead->msgLen = rpcMsgLenFromCont(origLen); pNewHead->msgLen = rpcMsgLenFromCont(origLen);
rpcFreeMsg(pHead); // free the compressed message buffer rpcFreeMsg(pHead); // free the compressed message buffer
pHead = pNewHead; pHead = pNewHead;
//tTrace("decompress rpc msg, compLen:%d, after:%d", compLen, contLen); tTrace("decomp malloc mem: %p", temp);
} else { } else {
tError("failed to allocate memory to decompress msg, contLen:%d", contLen); tError("failed to allocate memory to decompress msg, contLen:%d", contLen);
} }
......
...@@ -438,7 +438,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) { ...@@ -438,7 +438,7 @@ static int taosReadTcpData(SFdObj *pFdObj, SRecvInfo *pInfo) {
tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen); tError("%s %p TCP malloc(size:%d) fail", pThreadObj->label, pFdObj->thandle, msgLen);
return -1; return -1;
} else { } else {
tDebug("TCP malloc mem: %p", buffer); tTrace("TCP malloc mem: %p", buffer);
} }
msg = buffer + tsRpcOverhead; msg = buffer + tsRpcOverhead;
......
...@@ -214,7 +214,7 @@ static void *taosRecvUdpData(void *param) { ...@@ -214,7 +214,7 @@ static void *taosRecvUdpData(void *param) {
tError("%s failed to allocate memory, size:%" PRId64, pConn->label, (int64_t)dataLen); tError("%s failed to allocate memory, size:%" PRId64, pConn->label, (int64_t)dataLen);
continue; continue;
} else { } else {
tDebug("UDP malloc mem: %p", tmsg); tTrace("UDP malloc mem: %p", tmsg);
} }
tmsg += tsRpcOverhead; // overhead for SRpcReqContext tmsg += tsRpcOverhead; // overhead for SRpcReqContext
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册