未验证 提交 59e60e45 编写于 作者: H hzcheng 提交者: GitHub

Merge pull request #809 from taosdata/hotfix/#808

fix #808
...@@ -1059,6 +1059,16 @@ int taosBuildErrorMsgToPeer(char *pMsg, int code, char *pReply) { ...@@ -1059,6 +1059,16 @@ int taosBuildErrorMsgToPeer(char *pMsg, int code, char *pReply) {
return msgLen; return msgLen;
} }
void taosReportDisconnection(SRpcChann *pChann, SRpcConn *pConn)
{
SSchedMsg schedMsg;
schedMsg.fp = taosProcessSchedMsg;
schedMsg.msg = NULL;
schedMsg.ahandle = pConn->ahandle;
schedMsg.thandle = pConn;
taosScheduleTask(pChann->qhandle, &schedMsg);
}
void taosProcessIdleTimer(void *param, void *tmrId) { void taosProcessIdleTimer(void *param, void *tmrId) {
SRpcConn *pConn = (SRpcConn *)param; SRpcConn *pConn = (SRpcConn *)param;
if (pConn->signature != param) { if (pConn->signature != param) {
...@@ -1074,22 +1084,20 @@ void taosProcessIdleTimer(void *param, void *tmrId) { ...@@ -1074,22 +1084,20 @@ void taosProcessIdleTimer(void *param, void *tmrId) {
return; return;
} }
int reportDisc = 0;
pthread_mutex_lock(&pChann->mutex); pthread_mutex_lock(&pChann->mutex);
tTrace("%s cid:%d sid:%d id:%s, close the connection since no activity pConn:%p", pServer->label, pConn->chann, tTrace("%s cid:%d sid:%d id:%s, close the connection since no activity pConn:%p", pServer->label, pConn->chann,
pConn->sid, pConn->meterId, pConn); pConn->sid, pConn->meterId, pConn);
if (pConn->rspReceived == 0) { if (pConn->rspReceived == 0) {
pConn->rspReceived = 1; pConn->rspReceived = 1;
reportDisc = 1;
SSchedMsg schedMsg;
schedMsg.fp = taosProcessSchedMsg;
schedMsg.msg = NULL;
schedMsg.ahandle = pConn->ahandle;
schedMsg.thandle = pConn;
taosScheduleTask(pChann->qhandle, &schedMsg);
} }
pthread_mutex_unlock(&pChann->mutex); pthread_mutex_unlock(&pChann->mutex);
if (reportDisc) taosReportDisconnection(pChann, pConn);
} }
void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle, void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t port, void *shandle, void *thandle,
...@@ -1114,11 +1122,7 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t por ...@@ -1114,11 +1122,7 @@ void *taosProcessDataFromPeer(char *data, int dataLen, uint32_t ip, uint16_t por
pConn->meterId, pConn); pConn->meterId, pConn);
pConn->rspReceived = 1; pConn->rspReceived = 1;
pConn->chandle = NULL; pConn->chandle = NULL;
schedMsg.fp = taosProcessSchedMsg; taosReportDisconnection(pChann, pConn);
schedMsg.msg = NULL;
schedMsg.ahandle = pConn->ahandle;
schedMsg.thandle = pConn;
taosScheduleTask(pChann->qhandle, &schedMsg);
} }
tfree(data); tfree(data);
return NULL; return NULL;
...@@ -1330,6 +1334,7 @@ void taosProcessTaosTimer(void *param, void *tmrId) { ...@@ -1330,6 +1334,7 @@ void taosProcessTaosTimer(void *param, void *tmrId) {
STaosHeader *pHeader = NULL; STaosHeader *pHeader = NULL;
SRpcConn * pConn = (SRpcConn *)param; SRpcConn * pConn = (SRpcConn *)param;
int msgLen; int msgLen;
int reportDisc = 0;
if (pConn->signature != param) { if (pConn->signature != param) {
tError("pConn Signature:0x%x, pConn:0x%x not matched", pConn->signature, param); tError("pConn Signature:0x%x, pConn:0x%x not matched", pConn->signature, param);
...@@ -1379,13 +1384,7 @@ void taosProcessTaosTimer(void *param, void *tmrId) { ...@@ -1379,13 +1384,7 @@ void taosProcessTaosTimer(void *param, void *tmrId) {
pConn->sid, pConn->meterId, taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort, pConn); pConn->sid, pConn->meterId, taosMsg[pConn->outType], pConn->peerIpstr, pConn->peerPort, pConn);
if (pConn->rspReceived == 0) { if (pConn->rspReceived == 0) {
pConn->rspReceived = 1; pConn->rspReceived = 1;
reportDisc = 1;
SSchedMsg schedMsg;
schedMsg.fp = taosProcessSchedMsg;
schedMsg.msg = NULL;
schedMsg.ahandle = pConn->ahandle;
schedMsg.thandle = pConn;
taosScheduleTask(pChann->qhandle, &schedMsg);
} }
} }
} }
...@@ -1397,6 +1396,7 @@ void taosProcessTaosTimer(void *param, void *tmrId) { ...@@ -1397,6 +1396,7 @@ void taosProcessTaosTimer(void *param, void *tmrId) {
pthread_mutex_unlock(&pChann->mutex); pthread_mutex_unlock(&pChann->mutex);
if (reportDisc) taosReportDisconnection(pChann, pConn);
} }
void taosGetRpcConnInfo(void *thandle, uint32_t *peerId, uint32_t *peerIp, uint16_t *peerPort, int *cid, int *sid) { void taosGetRpcConnInfo(void *thandle, uint32_t *peerId, uint32_t *peerIp, uint16_t *peerPort, int *cid, int *sid) {
...@@ -1443,22 +1443,19 @@ void taosStopRpcConn(void *thandle) { ...@@ -1443,22 +1443,19 @@ void taosStopRpcConn(void *thandle) {
tTrace("%s cid:%d sid:%d id:%s, stop the connection pConn:%p", pServer->label, pConn->chann, pConn->sid, tTrace("%s cid:%d sid:%d id:%s, stop the connection pConn:%p", pServer->label, pConn->chann, pConn->sid,
pConn->meterId, pConn); pConn->meterId, pConn);
int reportDisc = 0;
pthread_mutex_lock(&pChann->mutex); pthread_mutex_lock(&pChann->mutex);
if (pConn->outType) { if (pConn->outType) {
pConn->rspReceived = 1; pConn->rspReceived = 1;
SSchedMsg schedMsg; reportDisc = 1;
schedMsg.fp = taosProcessSchedMsg;
schedMsg.msg = NULL;
schedMsg.ahandle = pConn->ahandle;
schedMsg.thandle = pConn;
pthread_mutex_unlock(&pChann->mutex); pthread_mutex_unlock(&pChann->mutex);
taosScheduleTask(pChann->qhandle, &schedMsg);
} else { } else {
pthread_mutex_unlock(&pChann->mutex); pthread_mutex_unlock(&pChann->mutex);
taosCloseRpcConn(pConn); taosCloseRpcConn(pConn);
} }
if (reportDisc) taosReportDisconnection(pChann, pConn);
} }
int taosAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) { int taosAuthenticateMsg(uint8_t *pMsg, int msgLen, uint8_t *pAuth, uint8_t *pKey) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册