diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 1da4c3f529bb7d88d051e8f22989d4312bb36379..fe07e616a11777c17b91b3c784b2834135f04be6 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -58,6 +58,7 @@ typedef struct { void (*cfp)(SRpcMsg *, SRpcIpSet *); int (*afp)(char *user, char *spi, char *encrypt, char *secret, char *ckey); + int refCount; void *idPool; // handle to ID pool void *tmrCtrl; // handle to timer SHashObj *hash; // handle returned by hash utility @@ -199,6 +200,8 @@ static int rpcAddAuthPart(SRpcConn *pConn, char *msg, int msgLen); static int rpcCheckAuthentication(SRpcConn *pConn, char *msg, int msgLen); static void rpcLockConn(SRpcConn *pConn); static void rpcUnlockConn(SRpcConn *pConn); +static void rpcAddRef(SRpcInfo *pRpc); +static void rpcDecRef(SRpcInfo *pRpc); void *rpcOpen(const SRpcInit *pInit) { SRpcInfo *pRpc; @@ -224,6 +227,7 @@ void *rpcOpen(const SRpcInit *pInit) { pRpc->spi = pInit->spi; pRpc->cfp = pInit->cfp; pRpc->afp = pInit->afp; + pRpc->refCount = 1; size_t size = sizeof(SRpcConn) * pRpc->sessions; pRpc->connList = (SRpcConn *)calloc(1, size); @@ -293,15 +297,8 @@ void rpcClose(void *param) { (*taosCleanUpConn[pRpc->connType | RPC_CONN_TCP])(pRpc->tcphandle); (*taosCleanUpConn[pRpc->connType])(pRpc->udphandle); - taosHashCleanup(pRpc->hash); - taosTmrCleanUp(pRpc->tmrCtrl); - taosIdPoolCleanUp(pRpc->idPool); - rpcCloseConnCache(pRpc->pCache); - - tfree(pRpc->connList); - pthread_mutex_destroy(&pRpc->mutex); tTrace("%s rpc is closed", pRpc->label); - tfree(pRpc); + rpcDecRef(pRpc); } void *rpcMallocCont(int contLen) { @@ -378,6 +375,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) { SRpcConn *pConn = (SRpcConn *)pRsp->handle; SRpcMsg rpcMsg = *pRsp; SRpcMsg *pMsg = &rpcMsg; + SRpcInfo *pRpc = pConn->pRpc; if ( pMsg->pCont == NULL ) { pMsg->pCont = rpcMallocCont(0); @@ -395,6 +393,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) { if ( pConn->inType == 0 || pConn->user[0] == 0 ) { tTrace("%s, connection is already released, rsp wont be sent", pConn->info); rpcUnlockConn(pConn); + rpcDecRef(pRpc); return; } @@ -420,7 +419,6 @@ void rpcSendResponse(const SRpcMsg *pRsp) { pConn->rspMsgLen = msgLen; if (pMsg->code == TSDB_CODE_ACTION_IN_PROGRESS) pConn->inTranId--; - SRpcInfo *pRpc = pConn->pRpc; taosTmrStopA(&pConn->pTimer); // set the idle timer to monitor the activity @@ -429,6 +427,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) { pConn->secured = 1; // connection shall be secured rpcUnlockConn(pConn); + rpcDecRef(pRpc); // decrease the referene count return; } @@ -951,6 +950,7 @@ static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead) { if ( rpcIsReq(pHead->msgType) ) { rpcMsg.handle = pConn; + rpcAddRef(pRpc); // add the refCount for requests pConn->pTimer = taosTmrStart(rpcProcessProgressTimer, tsProgressTimer, pConn, pRpc->tmrCtrl); (*(pRpc->cfp))(&rpcMsg, NULL); } else { @@ -1433,3 +1433,23 @@ static void rpcUnlockConn(SRpcConn *pConn) { } } +static void rpcAddRef(SRpcInfo *pRpc) +{ + atomic_add_fetch_8(&pRpc->refCount, 1); +} + +static void rpcDecRef(SRpcInfo *pRpc) +{ + if (atomic_sub_fetch_8(&pRpc->refCount, 1) == 0) { + taosHashCleanup(pRpc->hash); + taosTmrCleanUp(pRpc->tmrCtrl); + taosIdPoolCleanUp(pRpc->idPool); + rpcCloseConnCache(pRpc->pCache); + + tfree(pRpc->connList); + pthread_mutex_destroy(&pRpc->mutex); + tTrace("%s rpc resources are released", pRpc->label); + tfree(pRpc); + } +} +