未验证 提交 0ecd7c70 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2575 from taosdata/hotfix/rpcLeak

Hotfix/rpc leak
...@@ -210,6 +210,7 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -210,6 +210,7 @@ static void *dnodeProcessWriteQueue(void *param) {
int32_t numOfMsgs; int32_t numOfMsgs;
int type; int type;
void *pVnode, *item; void *pVnode, *item;
SRspRet *pRspRet;
dDebug("write worker:%d is running", pWorker->workerId); dDebug("write worker:%d is running", pWorker->workerId);
...@@ -222,9 +223,11 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -222,9 +223,11 @@ static void *dnodeProcessWriteQueue(void *param) {
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
pWrite = NULL; pWrite = NULL;
pRspRet = NULL;
taosGetQitem(pWorker->qall, &type, &item); taosGetQitem(pWorker->qall, &type, &item);
if (type == TAOS_QTYPE_RPC) { if (type == TAOS_QTYPE_RPC) {
pWrite = (SWriteMsg *)item; pWrite = (SWriteMsg *)item;
pRspRet = &pWrite->rspRet;
pHead = (SWalHead *)(pWrite->pCont - sizeof(SWalHead)); pHead = (SWalHead *)(pWrite->pCont - sizeof(SWalHead));
pHead->msgType = pWrite->rpcMsg.msgType; pHead->msgType = pWrite->rpcMsg.msgType;
pHead->version = 0; pHead->version = 0;
...@@ -234,7 +237,7 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -234,7 +237,7 @@ static void *dnodeProcessWriteQueue(void *param) {
pHead = (SWalHead *)item; pHead = (SWalHead *)item;
} }
int32_t code = vnodeProcessWrite(pVnode, type, pHead, item); int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet);
if (pWrite) pWrite->rpcMsg.code = code; if (pWrite) pWrite->rpcMsg.code = code;
} }
......
...@@ -538,6 +538,7 @@ void rpcCancelRequest(void *handle) { ...@@ -538,6 +538,7 @@ void rpcCancelRequest(void *handle) {
if (pContext->pConn) { if (pContext->pConn) {
tDebug("%s, app trys to cancel request", pContext->pConn->info); tDebug("%s, app trys to cancel request", pContext->pConn->info);
pContext->pConn->pReqMsg = NULL;
rpcCloseConn(pContext->pConn); rpcCloseConn(pContext->pConn);
pContext->pConn = NULL; pContext->pConn = NULL;
rpcFreeCont(pContext->pCont); rpcFreeCont(pContext->pCont);
...@@ -601,11 +602,10 @@ static void rpcReleaseConn(SRpcConn *pConn) { ...@@ -601,11 +602,10 @@ static void rpcReleaseConn(SRpcConn *pConn) {
taosHashRemove(pRpc->hash, hashstr, size); taosHashRemove(pRpc->hash, hashstr, size);
rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg
pConn->pRspMsg = NULL; pConn->pRspMsg = NULL;
if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg);
} }
// memset could not be used, since lockeBy can not be reset // memset could not be used, since lockeBy can not be reset
if (pConn->pReqMsg) rpcFreeCont(pConn->pReqMsg);
pConn->inType = 0; pConn->inType = 0;
pConn->outType = 0; pConn->outType = 0;
pConn->inTranId = 0; pConn->inTranId = 0;
...@@ -959,6 +959,7 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { ...@@ -959,6 +959,7 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
if (pConn->outType) { if (pConn->outType) {
SRpcReqContext *pContext = pConn->pContext; SRpcReqContext *pContext = pConn->pContext;
pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
pConn->pReqMsg = NULL;
taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
} }
...@@ -1061,6 +1062,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -1061,6 +1062,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
SRpcReqContext *pContext = pConn->pContext; SRpcReqContext *pContext = pConn->pContext;
rpcMsg.handle = pContext; rpcMsg.handle = pContext;
pConn->pContext = NULL; pConn->pContext = NULL;
pConn->pReqMsg = NULL;
// for UDP, port may be changed by server, the port in ipSet shall be used for cache // for UDP, port may be changed by server, the port in ipSet shall be used for cache
if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) {
...@@ -1297,6 +1299,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { ...@@ -1297,6 +1299,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
tDebug("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); tDebug("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort);
if (pConn->pContext) { if (pConn->pContext) {
pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
pConn->pReqMsg = NULL;
taosTmrStart(rpcProcessConnError, 0, pConn->pContext, pRpc->tmrCtrl); taosTmrStart(rpcProcessConnError, 0, pConn->pContext, pRpc->tmrCtrl);
rpcReleaseConn(pConn); rpcReleaseConn(pConn);
} }
......
...@@ -97,18 +97,17 @@ void vnodeConfirmForward(void *param, uint64_t version, int32_t code) { ...@@ -97,18 +97,17 @@ void vnodeConfirmForward(void *param, uint64_t version, int32_t code) {
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
vTrace("vgId:%d, submit msg is processed", pVnode->vgId);
// save insert result into item // save insert result into item
SShellSubmitRspMsg *pRsp = NULL;
if (pRet) {
pRet->len = sizeof(SShellSubmitRspMsg);
pRet->rsp = rpcMallocCont(pRet->len);
pRsp = pRet->rsp;
}
vTrace("vgId:%d, submit msg is processed", pVnode->vgId);
pRet->len = sizeof(SShellSubmitRspMsg);
pRet->rsp = rpcMallocCont(pRet->len);
SShellSubmitRspMsg *pRsp = pRet->rsp;
if (tsdbInsertData(pVnode->tsdb, pCont, pRsp) < 0) code = terrno; if (tsdbInsertData(pVnode->tsdb, pCont, pRsp) < 0) code = terrno;
pRsp->numOfFailedBlocks = 0; //TODO
//pRet->len += pRsp->numOfFailedBlocks * sizeof(SShellSubmitRspBlock); //TODO
pRsp->code = 0;
pRsp->numOfRows = htonl(1);
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册