未验证 提交 41adb25a 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #17447 from taosdata/fix/TS-1893

fix: fix rpc context freed in timer callback issue
...@@ -1080,7 +1080,9 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { ...@@ -1080,7 +1080,9 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
pContext->pConn = NULL; pContext->pConn = NULL;
pConn->pReqMsg = NULL; pConn->pReqMsg = NULL;
taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); int64_t *rid = malloc(sizeof(int64_t));
*rid = pContext->rid;
taosTmrStart(rpcProcessConnError, 0, rid, pRpc->tmrCtrl);
} }
if (pConn->inType) rpcReportBrokenLinkToServer(pConn); if (pConn->inType) rpcReportBrokenLinkToServer(pConn);
...@@ -1293,7 +1295,9 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte ...@@ -1293,7 +1295,9 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
} else if (pHead->code == TSDB_CODE_RPC_NOT_READY || pHead->code == TSDB_CODE_APP_NOT_READY || pHead->code == TSDB_CODE_DND_EXITING) { } else if (pHead->code == TSDB_CODE_RPC_NOT_READY || pHead->code == TSDB_CODE_APP_NOT_READY || pHead->code == TSDB_CODE_DND_EXITING) {
pContext->code = pHead->code; pContext->code = pHead->code;
rpcProcessConnError(pContext, NULL); int64_t *rid = malloc(sizeof(int64_t));
*rid = pContext->rid;
rpcProcessConnError(rid, NULL);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
} else { } else {
rpcNotifyClient(pContext, &rpcMsg); rpcNotifyClient(pContext, &rpcMsg);
...@@ -1395,7 +1399,9 @@ static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -1395,7 +1399,9 @@ static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
if (pConn == NULL) { if (pConn == NULL) {
pContext->code = terrno; pContext->code = terrno;
// in rpcProcessConnError if numOfTry over limit, could call rpcNotifyClient to stop query // in rpcProcessConnError if numOfTry over limit, could call rpcNotifyClient to stop query
taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl); int64_t *rid = malloc(sizeof(int64_t));
*rid = pContext->rid;
taosTmrStart(rpcProcessConnError, 1, rid, pRpc->tmrCtrl);
return BOOL_ASYNC; return BOOL_ASYNC;
} }
...@@ -1442,7 +1448,9 @@ static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { ...@@ -1442,7 +1448,9 @@ static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) {
// try next ip again // try next ip again
pContext->code = terrno; pContext->code = terrno;
// in rpcProcessConnError if numOfTry over limit, could call rpcNotifyClient to stop query // in rpcProcessConnError if numOfTry over limit, could call rpcNotifyClient to stop query
taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl); int64_t *rid = malloc(sizeof(int64_t));
*rid = pContext->rid;
taosTmrStart(rpcProcessConnError, 1, rid, pRpc->tmrCtrl);
return BOOL_ASYNC; return BOOL_ASYNC;
} }
...@@ -1480,11 +1488,23 @@ static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { ...@@ -1480,11 +1488,23 @@ static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
} }
static void rpcProcessConnError(void *param, void *id) { static void rpcProcessConnError(void *param, void *id) {
SRpcReqContext *pContext = (SRpcReqContext *)param; if (NULL == param) {
return;
}
int64_t *rid = (int64_t*)param;
SRpcReqContext *pContext = (SRpcReqContext *)taosAcquireRef(tsRpcRefId, *rid);
if (NULL == pContext) {
free(param);
return;
}
SRpcInfo *pRpc = pContext->pRpc; SRpcInfo *pRpc = pContext->pRpc;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
if (pRpc == NULL) { if (pRpc == NULL) {
taosReleaseRef(tsRpcRefId, *rid);
free(param);
return; return;
} }
...@@ -1504,6 +1524,9 @@ static void rpcProcessConnError(void *param, void *id) { ...@@ -1504,6 +1524,9 @@ static void rpcProcessConnError(void *param, void *id) {
pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps; pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps;
rpcSendReqToServer(pRpc, pContext); rpcSendReqToServer(pRpc, pContext);
} }
taosReleaseRef(tsRpcRefId, *rid);
free(param);
} }
static void rpcProcessRetryTimer(void *param, void *tmrId) { static void rpcProcessRetryTimer(void *param, void *tmrId) {
...@@ -1528,7 +1551,9 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { ...@@ -1528,7 +1551,9 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
pConn->pContext->pConn = NULL; pConn->pContext->pConn = NULL;
pConn->pReqMsg = NULL; pConn->pReqMsg = NULL;
taosTmrStart(rpcProcessConnError, 1, pConn->pContext, pRpc->tmrCtrl); int64_t *rid = malloc(sizeof(int64_t));
*rid = pConn->pContext->rid;
taosTmrStart(rpcProcessConnError, 1, rid, pRpc->tmrCtrl);
rpcReleaseConn(pConn); rpcReleaseConn(pConn);
} }
} }
...@@ -1892,4 +1917,4 @@ bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext) { ...@@ -1892,4 +1917,4 @@ bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext) {
taosReleaseRef(tsRpcRefId, rpcRid); taosReleaseRef(tsRpcRefId, rpcRid);
return true; return true;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册