提交 5b327d25 编写于 作者: A Alex Duan

fit(rpc): TCP probe msg if pConn is null set alive

上级 1c36c692
...@@ -282,7 +282,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -282,7 +282,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
} }
// if return true, send probe connection msg to sever ok // if return true, send probe connection msg to sever ok
bool sendProbeConnMsg(SSqlObj* pSql, int64_t stime) { bool sendProbeConnMsg(SSqlObj* pSql, int64_t stime, bool *pReqOver) {
if(stime == 0) { if(stime == 0) {
// not start , no need probe // not start , no need probe
tscInfo("PROBE 0x%" PRIx64 " not start, no need probe.", pSql->self); tscInfo("PROBE 0x%" PRIx64 " not start, no need probe.", pSql->self);
...@@ -318,8 +318,9 @@ bool sendProbeConnMsg(SSqlObj* pSql, int64_t stime) { ...@@ -318,8 +318,9 @@ bool sendProbeConnMsg(SSqlObj* pSql, int64_t stime) {
return true; return true;
} }
bool ret = rpcSendProbe(pSql->rpcRid, pSql->pPrevContext); bool ret = rpcSendProbe(pSql->rpcRid, pSql->pPrevContext, pReqOver);
tscInfo("PROBE 0x%" PRIx64 " send probe msg, ret=%d rpcRid=0x%" PRIx64, pSql->self, ret, pSql->rpcRid); if (!(*pReqOver))
tscInfo("PROBE 0x%" PRIx64 " send probe msg, ret=%d rpcRid=0x%" PRIx64, pSql->self, ret, pSql->rpcRid);
return ret; return ret;
} }
...@@ -335,16 +336,22 @@ void checkBrokenQueries(STscObj *pTscObj) { ...@@ -335,16 +336,22 @@ void checkBrokenQueries(STscObj *pTscObj) {
} }
bool kill = false; bool kill = false;
bool reqOver = false;
int32_t numOfSub = pSql->subState.numOfSub; int32_t numOfSub = pSql->subState.numOfSub;
tscInfo("PROBE 0x%" PRIx64 " start checking sql alive, numOfSub=%d sql=%s stime=%" PRId64 " alive=%" PRId64 " rpcRid=0x%" PRIx64 \ tscInfo("PROBE 0x%" PRIx64 " start checking sql alive, numOfSub=%d sql=%s stime=%" PRId64 " alive=%" PRId64 " rpcRid=0x%" PRIx64 \
,pSql->self, numOfSub, pSql->sqlstr == NULL ? "" : pSql->sqlstr, pSql->stime, pSql->lastAlive, pSql->rpcRid); ,pSql->self, numOfSub, pSql->sqlstr == NULL ? "" : pSql->sqlstr, pSql->stime, pSql->lastAlive, pSql->rpcRid);
if (numOfSub == 0) { if (numOfSub == 0) {
// no sub sql // no sub sql
if(!sendProbeConnMsg(pSql, pSql->stime)) { if(!sendProbeConnMsg(pSql, pSql->stime, &reqOver)) {
// need kill // need kill
tscInfo("PROBE 0x%" PRIx64 " need break link done. rpcRid=0x%" PRIx64, pSql->self, pSql->rpcRid); tscInfo("PROBE 0x%" PRIx64 " need break link done. rpcRid=0x%" PRIx64, pSql->self, pSql->rpcRid);
kill = true; kill = true;
} }
if (reqOver) {
// current request is finished over, so upate alive to now
pSql->lastAlive = taosGetTimestampMs();
}
} else { } else {
// lock subs // lock subs
pthread_mutex_lock(&pSql->subState.mutex); pthread_mutex_lock(&pSql->subState.mutex);
...@@ -354,12 +361,17 @@ void checkBrokenQueries(STscObj *pTscObj) { ...@@ -354,12 +361,17 @@ void checkBrokenQueries(STscObj *pTscObj) {
SSqlObj *pSubSql = pSql->pSubs[i]; SSqlObj *pSubSql = pSql->pSubs[i];
if(pSubSql) { if(pSubSql) {
tscInfo("PROBE 0x%" PRIx64 " sub sql app is 0x%" PRIx64, pSql->self, pSubSql->self); tscInfo("PROBE 0x%" PRIx64 " sub sql app is 0x%" PRIx64, pSql->self, pSubSql->self);
if(!sendProbeConnMsg(pSubSql, pSql->stime)) { if(!sendProbeConnMsg(pSubSql, pSql->stime, &reqOver)) {
// need kill // need kill
tscInfo("PROBE 0x%" PRIx64 " i=%d sub app=0x%" PRIx64 " need break link done. rpcRid=0x%" PRIx64, pSql->self, i, pSubSql->self, pSubSql->rpcRid); tscInfo("PROBE 0x%" PRIx64 " i=%d sub app=0x%" PRIx64 " need break link done. rpcRid=0x%" PRIx64, pSql->self, i, pSubSql->self, pSubSql->rpcRid);
kill = true; kill = true;
break; break;
} }
if (reqOver) {
// current request is finished over, so upate alive to now
pSubSql->lastAlive = taosGetTimestampMs();
}
} }
} }
} }
......
...@@ -93,7 +93,7 @@ void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp ...@@ -93,7 +93,7 @@ void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp
int rpcReportProgress(void *pConn, char *pCont, int contLen); int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid); void rpcCancelRequest(int64_t rid);
// send rpc Refid connection probe alive message // send rpc Refid connection probe alive message
bool rpcSendProbe(int64_t rpcRid, void* pPrevContext); bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, bool *pReqOver);
// after sql request send , save conn info // after sql request send , save conn info
bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext); bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext);
......
...@@ -1784,7 +1784,7 @@ bool doRpcSendProbe(SRpcConn *pConn) { ...@@ -1784,7 +1784,7 @@ bool doRpcSendProbe(SRpcConn *pConn) {
} }
// send server syn // send server syn
bool rpcSendProbe(int64_t rpcRid, void* pPrevContext) { bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, bool *pReqOver) {
// return false can kill query // return false can kill query
bool ret = false; bool ret = false;
if(rpcRid < 0) { if(rpcRid < 0) {
...@@ -1807,7 +1807,9 @@ bool rpcSendProbe(int64_t rpcRid, void* pPrevContext) { ...@@ -1807,7 +1807,9 @@ bool rpcSendProbe(int64_t rpcRid, void* pPrevContext) {
// conn same // conn same
if(pContext->pConn == NULL) { if(pContext->pConn == NULL) {
tInfo("PROBE rpcRid=0x%" PRIx64 " connect obj is NULL. ", rpcRid); tInfo("PROBE rpcRid=0x%" PRIx64 " connect obj is NULL. req is response and done.", rpcRid);
if(pReqOver)
*pReqOver = true;
ret = true; ret = true;
goto _END; goto _END;
} else if (pContext->pConn != pContext->sendInfo.pConn) { } else if (pContext->pConn != pContext->sendInfo.pConn) {
...@@ -1829,7 +1831,10 @@ bool rpcSendProbe(int64_t rpcRid, void* pPrevContext) { ...@@ -1829,7 +1831,10 @@ bool rpcSendProbe(int64_t rpcRid, void* pPrevContext) {
} }
// send syn // send syn
ret = doRpcSendProbe(pContext->pConn); if (!doRpcSendProbe(pContext->pConn)) {
tError("PROBE rpcRid=0x%" PRIx64 " fd=%d rpc send probe data error.", rpcRid, fd);
}
ret = true;
_END: _END:
// put back req context // put back req context
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册