提交 06775965 编写于 作者: D dapan1121

fix: fix rpc context freed in timer callback issue

上级 e06f1295
......@@ -1080,7 +1080,10 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
pContext->pConn = NULL;
pConn->pReqMsg = NULL;
taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl);
taosAcquireRef(tsRpcRefId, pContext->rid);
int64_t *rid = malloc(sizeof(int64_t));
*rid = pContext->rid;
taosTmrStart(rpcProcessConnError, 0, rid, pRpc->tmrCtrl);
}
if (pConn->inType) rpcReportBrokenLinkToServer(pConn);
......@@ -1293,7 +1296,9 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
rpcFreeCont(rpcMsg.pCont);
} else if (pHead->code == TSDB_CODE_RPC_NOT_READY || pHead->code == TSDB_CODE_APP_NOT_READY || pHead->code == TSDB_CODE_DND_EXITING) {
pContext->code = pHead->code;
rpcProcessConnError(pContext, NULL);
int64_t *rid = malloc(sizeof(int64_t));
*rid = pContext->rid;
rpcProcessConnError(rid, NULL);
rpcFreeCont(rpcMsg.pCont);
} else {
rpcNotifyClient(pContext, &rpcMsg);
......@@ -1395,7 +1400,9 @@ static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
if (pConn == NULL) {
pContext->code = terrno;
// in rpcProcessConnError if numOfTry over limit, could call rpcNotifyClient to stop query
taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl);
int64_t *rid = malloc(sizeof(int64_t));
*rid = pContext->rid;
taosTmrStart(rpcProcessConnError, 1, rid, pRpc->tmrCtrl);
return BOOL_ASYNC;
}
......@@ -1442,7 +1449,9 @@ static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
// try next ip again
pContext->code = terrno;
// in rpcProcessConnError if numOfTry over limit, could call rpcNotifyClient to stop query
taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl);
int64_t *rid = malloc(sizeof(int64_t));
*rid = pContext->rid;
taosTmrStart(rpcProcessConnError, 1, rid, pRpc->tmrCtrl);
return BOOL_ASYNC;
}
......@@ -1480,11 +1489,17 @@ static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
}
static void rpcProcessConnError(void *param, void *id) {
SRpcReqContext *pContext = (SRpcReqContext *)param;
int64_t *rid = (int64_t*)param;
SRpcReqContext *pContext = (SRpcReqContext *)taosAcquireRef(tsRpcRefId, *rid);
if (NULL == pContext) {
return;
}
SRpcInfo *pRpc = pContext->pRpc;
SRpcMsg rpcMsg;
if (pRpc == NULL) {
taosReleaseRef(tsRpcRefId, *rid);
return;
}
......@@ -1504,6 +1519,8 @@ static void rpcProcessConnError(void *param, void *id) {
pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps;
rpcSendReqToServer(pRpc, pContext);
}
taosReleaseRef(tsRpcRefId, *rid);
}
static void rpcProcessRetryTimer(void *param, void *tmrId) {
......@@ -1528,7 +1545,9 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
pConn->pContext->pConn = NULL;
pConn->pReqMsg = NULL;
taosTmrStart(rpcProcessConnError, 1, pConn->pContext, pRpc->tmrCtrl);
int64_t *rid = malloc(sizeof(int64_t));
*rid = pConn->pContext->rid;
taosTmrStart(rpcProcessConnError, 1, rid, pRpc->tmrCtrl);
rpcReleaseConn(pConn);
}
}
......@@ -1892,4 +1911,4 @@ bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext) {
taosReleaseRef(tsRpcRefId, rpcRid);
return true;
}
\ No newline at end of file
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册