diff --git a/src/rpc/src/trpc.c b/src/rpc/src/trpc.c index d84746817dd524fbb9e7a5b4aea1631ffda1c9ab..9f006ab05a9612a1ca2f6f8e3e790156938bf423 100755 --- a/src/rpc/src/trpc.c +++ b/src/rpc/src/trpc.c @@ -1059,6 +1059,16 @@ int taosBuildErrorMsgToPeer(char *pMsg, int code, char *pReply) { return msgLen; } +void taosReportDisconnection(SRpcChann *pChann, SRpcConn *pConn) +{ + SSchedMsg schedMsg; + schedMsg.fp = taosProcessSchedMsg; + schedMsg.msg = NULL; + schedMsg.ahandle = pConn->ahandle; + schedMsg.thandle = pConn; + taosScheduleTask(pChann->qhandle, &schedMsg); +} + void taosProcessIdleTimer(void *param, void *tmrId) { SRpcConn *pConn = (SRpcConn *)param; if (pConn->signature != param) { @@ -1074,22 +1084,20 @@ void taosProcessIdleTimer(void *param, void *tmrId) { return; } + int reportDisc = 0; + pthread_mutex_lock(&pChann->mutex); tTrace("%s cid:%d sid:%d id:%s, close the connection since no activity pConn:%p", pServer->label, pConn->chann, pConn->sid, pConn->meterId, pConn); if (pConn->rspReceived == 0) { pConn->rspReceived = 1; - - SSchedMsg schedMsg; - schedMsg.fp = taosProcessSchedMsg; - schedMsg.msg = NULL; - schedMsg.ahandle = pConn->ahandle; - schedMsg.thandle = pConn; - taosScheduleTask(pChann->qhandle, &schedMsg); + reportDisc = 1; } pthread_mutex_unlock(&pChann->mutex); + + if (reportDisc) taosReportDisconnection(pChann, pConn); } void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, @@ -1114,11 +1122,7 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t por pConn->meterId, pConn); pConn->rspReceived = 1; pConn->chandle = NULL; - schedMsg.fp = taosProcessSchedMsg; - schedMsg.msg = NULL; - schedMsg.ahandle = pConn->ahandle; - schedMsg.thandle = pConn; - taosScheduleTask(pChann->qhandle, &schedMsg); + taosReportDisconnection(pChann, pConn); } tfree(data); return NULL; @@ -1330,6 +1334,7 @@ void taosProcessTaosTimer(void *param, void *tmrId) { STaosHeader *pHeader = NULL; SRpcConn * pConn = (SRpcConn *)param; int msgLen; + int reportDisc = 0; if (pConn->signature != param) { tError("pConn Signature:0x%x, pConn:0x%x not matched", pConn->signature, param); @@ -1379,13 +1384,7 @@ void taosProcessTaosTimer(void *param, void *tmrId) { pConn->sid, pConn->meterId, taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort, pConn); if (pConn->rspReceived == 0) { pConn->rspReceived = 1; - - SSchedMsg schedMsg; - schedMsg.fp = taosProcessSchedMsg; - schedMsg.msg = NULL; - schedMsg.ahandle = pConn->ahandle; - schedMsg.thandle = pConn; - taosScheduleTask(pChann->qhandle, &schedMsg); + reportDisc = 1; } } } @@ -1397,6 +1396,7 @@ void taosProcessTaosTimer(void *param, void *tmrId) { pthread_mutex_unlock(&pChann->mutex); + if (reportDisc) taosReportDisconnection(pChann, pConn); } void taosGetRpcConnInfo(void *thandle, uint32_t *peerId, uint32_t *peerIp, uint16_t *peerPort, int *cid, int *sid) { @@ -1443,22 +1443,19 @@ void taosStopRpcConn(void *thandle) { tTrace("%s cid:%d sid:%d id:%s, stop the connection pConn:%p", pServer->label, pConn->chann, pConn->sid, pConn->meterId, pConn); + int reportDisc = 0; pthread_mutex_lock(&pChann->mutex); if (pConn->outType) { pConn->rspReceived = 1; - SSchedMsg schedMsg; - schedMsg.fp = taosProcessSchedMsg; - schedMsg.msg = NULL; - schedMsg.ahandle = pConn->ahandle; - schedMsg.thandle = pConn; + reportDisc = 1; pthread_mutex_unlock(&pChann->mutex); - - taosScheduleTask(pChann->qhandle, &schedMsg); } else { pthread_mutex_unlock(&pChann->mutex); taosCloseRpcConn(pConn); } + + if (reportDisc) taosReportDisconnection(pChann, pConn); } int taosAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) {