提交 58ecd5c3 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

submit memory leak for wal or forward case

上级 159f6230
...@@ -210,6 +210,7 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -210,6 +210,7 @@ static void *dnodeProcessWriteQueue(void *param) {
int32_t numOfMsgs; int32_t numOfMsgs;
int type; int type;
void *pVnode, *item; void *pVnode, *item;
SRspRet *pRspRet;
dDebug("write worker:%d is running", pWorker->workerId); dDebug("write worker:%d is running", pWorker->workerId);
...@@ -222,9 +223,11 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -222,9 +223,11 @@ static void *dnodeProcessWriteQueue(void *param) {
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
pWrite = NULL; pWrite = NULL;
pRspRet = NULL;
taosGetQitem(pWorker->qall, &type, &item); taosGetQitem(pWorker->qall, &type, &item);
if (type == TAOS_QTYPE_RPC) { if (type == TAOS_QTYPE_RPC) {
pWrite = (SWriteMsg *)item; pWrite = (SWriteMsg *)item;
pRspRet = &pWrite->rspRet;
pHead = (SWalHead *)(pWrite->pCont - sizeof(SWalHead)); pHead = (SWalHead *)(pWrite->pCont - sizeof(SWalHead));
pHead->msgType = pWrite->rpcMsg.msgType; pHead->msgType = pWrite->rpcMsg.msgType;
pHead->version = 0; pHead->version = 0;
...@@ -234,7 +237,7 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -234,7 +237,7 @@ static void *dnodeProcessWriteQueue(void *param) {
pHead = (SWalHead *)item; pHead = (SWalHead *)item;
} }
int32_t code = vnodeProcessWrite(pVnode, type, pHead, item); int32_t code = vnodeProcessWrite(pVnode, type, pHead, pRspRet);
if (pWrite) pWrite->rpcMsg.code = code; if (pWrite) pWrite->rpcMsg.code = code;
} }
......
...@@ -538,6 +538,7 @@ void rpcCancelRequest(void *handle) { ...@@ -538,6 +538,7 @@ void rpcCancelRequest(void *handle) {
if (pContext->pConn) { if (pContext->pConn) {
tDebug("%s, app trys to cancel request", pContext->pConn->info); tDebug("%s, app trys to cancel request", pContext->pConn->info);
pContext->pConn->pReqMsg = NULL;
rpcCloseConn(pContext->pConn); rpcCloseConn(pContext->pConn);
pContext->pConn = NULL; pContext->pConn = NULL;
rpcFreeCont(pContext->pCont); rpcFreeCont(pContext->pCont);
...@@ -958,7 +959,7 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { ...@@ -958,7 +959,7 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
if (pConn->outType) { if (pConn->outType) {
SRpcReqContext *pContext = pConn->pContext; SRpcReqContext *pContext = pConn->pContext;
pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
pConn->pReq = NULL; pConn->pReqMsg = NULL;
taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
} }
...@@ -1061,7 +1062,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { ...@@ -1061,7 +1062,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) {
SRpcReqContext *pContext = pConn->pContext; SRpcReqContext *pContext = pConn->pContext;
rpcMsg.handle = pContext; rpcMsg.handle = pContext;
pConn->pContext = NULL; pConn->pContext = NULL;
pConn->pReq = NULL; pConn->pReqMsg = NULL;
// for UDP, port may be changed by server, the port in ipSet shall be used for cache // for UDP, port may be changed by server, the port in ipSet shall be used for cache
if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) { if (pHead->code != TSDB_CODE_RPC_TOO_SLOW) {
...@@ -1298,7 +1299,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { ...@@ -1298,7 +1299,7 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
tDebug("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); tDebug("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort);
if (pConn->pContext) { if (pConn->pContext) {
pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
pConn->pReq = NULL; pConn->pReqMsg = NULL;
taosTmrStart(rpcProcessConnError, 0, pConn->pContext, pRpc->tmrCtrl); taosTmrStart(rpcProcessConnError, 0, pConn->pContext, pRpc->tmrCtrl);
rpcReleaseConn(pConn); rpcReleaseConn(pConn);
} }
......
...@@ -97,18 +97,17 @@ void vnodeConfirmForward(void *param, uint64_t version, int32_t code) { ...@@ -97,18 +97,17 @@ void vnodeConfirmForward(void *param, uint64_t version, int32_t code) {
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
vTrace("vgId:%d, submit msg is processed", pVnode->vgId);
// save insert result into item // save insert result into item
SShellSubmitRspMsg *pRsp = NULL;
if (pRet) {
pRet->len = sizeof(SShellSubmitRspMsg);
pRet->rsp = rpcMallocCont(pRet->len);
pRsp = pRet->rsp;
}
vTrace("vgId:%d, submit msg is processed", pVnode->vgId);
pRet->len = sizeof(SShellSubmitRspMsg);
pRet->rsp = rpcMallocCont(pRet->len);
SShellSubmitRspMsg *pRsp = pRet->rsp;
if (tsdbInsertData(pVnode->tsdb, pCont, pRsp) < 0) code = terrno; if (tsdbInsertData(pVnode->tsdb, pCont, pRsp) < 0) code = terrno;
pRsp->numOfFailedBlocks = 0; //TODO
//pRet->len += pRsp->numOfFailedBlocks * sizeof(SShellSubmitRspBlock); //TODO
pRsp->code = 0;
pRsp->numOfRows = htonl(1);
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册