diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 6566bf9e79f3f58557a80c037e99027cd19753ba..5c385410080f62b94dc481c597f7a6e847cc4d46 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -1080,7 +1080,9 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pContext->pConn = NULL; pConn->pReqMsg = NULL; - taosTmrStart(rpcProcessConnError, 0, pContext, pRpc->tmrCtrl); + int64_t *rid = malloc(sizeof(int64_t)); + *rid = pContext->rid; + taosTmrStart(rpcProcessConnError, 0, rid, pRpc->tmrCtrl); } if (pConn->inType) rpcReportBrokenLinkToServer(pConn); @@ -1293,7 +1295,9 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqConte rpcFreeCont(rpcMsg.pCont); } else if (pHead->code == TSDB_CODE_RPC_NOT_READY || pHead->code == TSDB_CODE_APP_NOT_READY || pHead->code == TSDB_CODE_DND_EXITING) { pContext->code = pHead->code; - rpcProcessConnError(pContext, NULL); + int64_t *rid = malloc(sizeof(int64_t)); + *rid = pContext->rid; + rpcProcessConnError(rid, NULL); rpcFreeCont(rpcMsg.pCont); } else { rpcNotifyClient(pContext, &rpcMsg); @@ -1395,7 +1399,9 @@ static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { if (pConn == NULL) { pContext->code = terrno; // in rpcProcessConnError if numOfTry over limit, could call rpcNotifyClient to stop query - taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl); + int64_t *rid = malloc(sizeof(int64_t)); + *rid = pContext->rid; + taosTmrStart(rpcProcessConnError, 1, rid, pRpc->tmrCtrl); return BOOL_ASYNC; } @@ -1442,7 +1448,9 @@ static TBOOL rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { // try next ip again pContext->code = terrno; // in rpcProcessConnError if numOfTry over limit, could call rpcNotifyClient to stop query - taosTmrStart(rpcProcessConnError, 1, pContext, pRpc->tmrCtrl); + int64_t *rid = malloc(sizeof(int64_t)); + *rid = pContext->rid; + taosTmrStart(rpcProcessConnError, 1, rid, pRpc->tmrCtrl); return BOOL_ASYNC; } @@ -1480,11 +1488,23 @@ static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { } static void rpcProcessConnError(void *param, void *id) { - SRpcReqContext *pContext = (SRpcReqContext *)param; + if (NULL == param) { + return; + } + + int64_t *rid = (int64_t*)param; + SRpcReqContext *pContext = (SRpcReqContext *)taosAcquireRef(tsRpcRefId, *rid); + if (NULL == pContext) { + free(param); + return; + } + SRpcInfo *pRpc = pContext->pRpc; SRpcMsg rpcMsg; if (pRpc == NULL) { + taosReleaseRef(tsRpcRefId, *rid); + free(param); return; } @@ -1504,6 +1524,9 @@ static void rpcProcessConnError(void *param, void *id) { pContext->epSet.inUse = pContext->epSet.inUse % pContext->epSet.numOfEps; rpcSendReqToServer(pRpc, pContext); } + + taosReleaseRef(tsRpcRefId, *rid); + free(param); } static void rpcProcessRetryTimer(void *param, void *tmrId) { @@ -1528,7 +1551,9 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pConn->pContext->pConn = NULL; pConn->pReqMsg = NULL; - taosTmrStart(rpcProcessConnError, 1, pConn->pContext, pRpc->tmrCtrl); + int64_t *rid = malloc(sizeof(int64_t)); + *rid = pConn->pContext->rid; + taosTmrStart(rpcProcessConnError, 1, rid, pRpc->tmrCtrl); rpcReleaseConn(pConn); } } @@ -1892,4 +1917,4 @@ bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext) { taosReleaseRef(tsRpcRefId, rpcRid); return true; -} \ No newline at end of file +}