提交 646b5e53 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

remove race condition in retry timer

上级 be1791e4
...@@ -555,18 +555,10 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort, ...@@ -555,18 +555,10 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort,
return pConn; return pConn;
} }
static void rpcCloseConn(void *thandle) { static void rpcReleaseConn(SRpcConn *pConn) {
SRpcConn *pConn = (SRpcConn *)thandle;
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
if (pConn->user[0] == 0) return; if (pConn->user[0] == 0) return;
rpcLockConn(pConn);
if (pConn->user[0] == 0) {
rpcUnlockConn(pConn);
return;
}
pConn->user[0] = 0; pConn->user[0] = 0;
if (taosCloseConn[pConn->connType]) (*taosCloseConn[pConn->connType])(pConn->chandle); if (taosCloseConn[pConn->connType]) (*taosCloseConn[pConn->connType])(pConn->chandle);
...@@ -591,7 +583,21 @@ static void rpcCloseConn(void *thandle) { ...@@ -591,7 +583,21 @@ static void rpcCloseConn(void *thandle) {
taosFreeId(pRpc->idPool, pConn->sid); taosFreeId(pRpc->idPool, pConn->sid);
pConn->pContext = NULL; pConn->pContext = NULL;
tTrace("%s, rpc connection is closed", pConn->info); tTrace("%s, rpc connection is released", pConn->info);
}
static void rpcCloseConn(void *thandle) {
SRpcConn *pConn = (SRpcConn *)thandle;
if (pConn->user[0] == 0) return;
rpcLockConn(pConn);
if (pConn->user[0] == 0) {
rpcUnlockConn(pConn);
return;
}
rpcReleaseConn(pConn);
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
} }
...@@ -911,8 +917,8 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { ...@@ -911,8 +917,8 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
if (pConn->inType) rpcReportBrokenLinkToServer(pConn); if (pConn->inType) rpcReportBrokenLinkToServer(pConn);
rpcReleaseConn(pConn);
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
rpcCloseConn(pConn);
} }
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
...@@ -1217,7 +1223,6 @@ static void rpcProcessConnError(void *param, void *id) { ...@@ -1217,7 +1223,6 @@ static void rpcProcessConnError(void *param, void *id) {
static void rpcProcessRetryTimer(void *param, void *tmrId) { static void rpcProcessRetryTimer(void *param, void *tmrId) {
SRpcConn *pConn = (SRpcConn *)param; SRpcConn *pConn = (SRpcConn *)param;
SRpcInfo *pRpc = pConn->pRpc; SRpcInfo *pRpc = pConn->pRpc;
int reportDisc = 0;
rpcLockConn(pConn); rpcLockConn(pConn);
...@@ -1233,19 +1238,17 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) { ...@@ -1233,19 +1238,17 @@ static void rpcProcessRetryTimer(void *param, void *tmrId) {
} else { } else {
// close the connection // close the connection
tTrace("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort); tTrace("%s, failed to send msg:%s to %s:%hu", pConn->info, taosMsg[pConn->outType], pConn->peerFqdn, pConn->peerPort);
reportDisc = 1; if (pConn->pContext) {
pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
rpcProcessConnError(pConn->pContext, NULL);
rpcReleaseConn(pConn);
}
} }
} else { } else {
tTrace("%s, retry timer not processed", pConn->info); tTrace("%s, retry timer not processed", pConn->info);
} }
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
if (reportDisc && pConn->pContext) {
pConn->pContext->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
rpcProcessConnError(pConn->pContext, NULL);
rpcCloseConn(pConn);
}
} }
static void rpcProcessIdleTimer(void *param, void *tmrId) { static void rpcProcessIdleTimer(void *param, void *tmrId) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册