diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 9bb63b751ab98483a10b2d191b9b64e14dbf7cd9..989021eb52aec0c02e39f7b21da2b2758df3cea9 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -489,17 +489,22 @@ void rpcSendRecv(void *shandle, SRpcIpSet *pIpSet, const SRpcMsg *pMsg, SRpcMsg // this API is used by server app to keep an APP context in case connection is broken int rpcReportProgress(void *handle, char *pCont, int contLen) { SRpcConn *pConn = (SRpcConn *)handle; + int code = 0; + + rpcLockConn(pConn); if (pConn->user[0]) { // pReqMsg and reqMsgLen is re-used to store the context from app server pConn->pReqMsg = pCont; pConn->reqMsgLen = contLen; - return 0; - } + } else { + tTrace("%s, rpc connection is already released", pConn->info); + rpcFreeCont(pCont); + code = -1; + } - tTrace("%s, rpc connection is already released", pConn->info); - rpcFreeCont(pCont); - return -1; + rpcUnlockConn(pConn); + return code; } /* todo: cancel process may have race condition, pContext may have been released @@ -555,18 +560,10 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, return pConn; } -static void rpcCloseConn(void *thandle) { - SRpcConn *pConn = (SRpcConn *)thandle; +static void rpcReleaseConn(SRpcConn *pConn) { SRpcInfo *pRpc = pConn->pRpc; if (pConn->user[0] == 0) return; - rpcLockConn(pConn); - - if (pConn->user[0] == 0) { - rpcUnlockConn(pConn); - return; - } - pConn->user[0] = 0; if (taosCloseConn[pConn->connType]) (*taosCloseConn[pConn->connType])(pConn->chandle); @@ -591,7 +588,16 @@ static void rpcCloseConn(void *thandle) { taosFreeId(pRpc->idPool, pConn->sid); pConn->pContext = NULL; - tTrace("%s, rpc connection is closed", pConn->info); + tTrace("%s, rpc connection is released", pConn->info); +} + +static void rpcCloseConn(void *thandle) { + SRpcConn *pConn = (SRpcConn *)thandle; + + rpcLockConn(pConn); + + if (pConn->user[0]) + rpcReleaseConn(pConn); rpcUnlockConn(pConn); } @@ -911,8 +917,8 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { if (pConn->inType) rpcReportBrokenLinkToServer(pConn); + rpcReleaseConn(pConn); rpcUnlockConn(pConn); - rpcCloseConn(pConn); } static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { @@ -1217,7 +1223,6 @@ static void rpcProcessConnError(void *param, void *id) { static void rpcProcessRetryTimer(void *param, void *tmrId) { SRpcConn *pConn = (SRpcConn *)param; SRpcInfo *pRpc = pConn->pRpc; - int reportDisc = 0; rpcLockConn(pConn); @@ -1233,31 +1238,33 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { } else { // close the connection tTrace("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); - reportDisc = 1; + if (pConn->pContext) { + pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + taosTmrStart(rpcProcessConnError, 0, pConn->pContext, pRpc->tmrCtrl); + rpcReleaseConn(pConn); + } } } else { tTrace("%s, retry timer not processed", pConn->info); } rpcUnlockConn(pConn); - - if (reportDisc && pConn->pContext) { - pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - rpcProcessConnError(pConn->pContext, NULL); - rpcCloseConn(pConn); - } } static void rpcProcessIdleTimer(void *param, void *tmrId) { SRpcConn *pConn = (SRpcConn *)param; + rpcLockConn(pConn); + if (pConn->user[0]) { tTrace("%s, close the connection since no activity", pConn->info); if (pConn->inType) rpcReportBrokenLinkToServer(pConn); - rpcCloseConn(pConn); + rpcReleaseConn(pConn); } else { tTrace("%s, idle timer:%p not processed", pConn->info, tmrId); } + + rpcUnlockConn(pConn); } static void rpcProcessProgressTimer(void *param, void *tmrId) {