未验证 提交 9d7ef2ca 编写于 作者: M Minglei Jin 提交者: GitHub

Merge pull request #16069 from taosdata/fix/TD-18361-V26

fix(rpc): TCP probe check alive update alive if pConn is null
......@@ -282,7 +282,7 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
}
// if return true, send probe connection msg to sever ok
bool sendProbeConnMsg(SSqlObj* pSql, int64_t stime) {
bool sendProbeConnMsg(SSqlObj* pSql, int64_t stime, bool *pReqOver) {
if(stime == 0) {
// not start , no need probe
tscInfo("PROBE 0x%" PRIx64 " not start, no need probe.", pSql->self);
......@@ -318,8 +318,9 @@ bool sendProbeConnMsg(SSqlObj* pSql, int64_t stime) {
return true;
}
bool ret = rpcSendProbe(pSql->rpcRid, pSql->pPrevContext);
tscInfo("PROBE 0x%" PRIx64 " send probe msg, ret=%d rpcRid=0x%" PRIx64, pSql->self, ret, pSql->rpcRid);
bool ret = rpcSendProbe(pSql->rpcRid, pSql->pPrevContext, pReqOver);
if (!(*pReqOver))
tscInfo("PROBE 0x%" PRIx64 " send probe msg, ret=%d rpcRid=0x%" PRIx64, pSql->self, ret, pSql->rpcRid);
return ret;
}
......@@ -335,16 +336,22 @@ void checkBrokenQueries(STscObj *pTscObj) {
}
bool kill = false;
bool reqOver = false;
int32_t numOfSub = pSql->subState.numOfSub;
tscInfo("PROBE 0x%" PRIx64 " start checking sql alive, numOfSub=%d sql=%s stime=%" PRId64 " alive=%" PRId64 " rpcRid=0x%" PRIx64 \
,pSql->self, numOfSub, pSql->sqlstr == NULL ? "" : pSql->sqlstr, pSql->stime, pSql->lastAlive, pSql->rpcRid);
if (numOfSub == 0) {
// no sub sql
if(!sendProbeConnMsg(pSql, pSql->stime)) {
if(!sendProbeConnMsg(pSql, pSql->stime, &reqOver)) {
// need kill
tscInfo("PROBE 0x%" PRIx64 " need break link done. rpcRid=0x%" PRIx64, pSql->self, pSql->rpcRid);
kill = true;
}
if (reqOver) {
// current request is finished over, so upate alive to now
pSql->lastAlive = taosGetTimestampMs();
}
} else {
// lock subs
pthread_mutex_lock(&pSql->subState.mutex);
......@@ -354,13 +361,18 @@ void checkBrokenQueries(STscObj *pTscObj) {
SSqlObj *pSubSql = pSql->pSubs[i];
if(pSubSql) {
tscInfo("PROBE 0x%" PRIx64 " sub sql app is 0x%" PRIx64, pSql->self, pSubSql->self);
if(!sendProbeConnMsg(pSubSql, pSql->stime)) {
if(!sendProbeConnMsg(pSubSql, pSql->stime, &reqOver)) {
// need kill
tscInfo("PROBE 0x%" PRIx64 " i=%d sub app=0x%" PRIx64 " need break link done. rpcRid=0x%" PRIx64, pSql->self, i, pSubSql->self, pSubSql->rpcRid);
kill = true;
break;
}
}
if (reqOver) {
// current request is finished over, so upate alive to now
pSubSql->lastAlive = taosGetTimestampMs();
}
}
}
// unlock
......
......@@ -94,7 +94,7 @@ int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid);
int32_t rpcUnusedSession(void * rpcInfo, bool bLock);
// send rpc Refid connection probe alive message
bool rpcSendProbe(int64_t rpcRid, void* pPrevContext);
bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, bool *pReqOver);
// after sql request send , save conn info
bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext);
......
......@@ -1805,7 +1805,7 @@ bool doRpcSendProbe(SRpcConn *pConn) {
}
// send server syn
bool rpcSendProbe(int64_t rpcRid, void* pPrevContext) {
bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, bool *pReqOver) {
// return false can kill query
bool ret = false;
if(rpcRid < 0) {
......@@ -1828,7 +1828,10 @@ bool rpcSendProbe(int64_t rpcRid, void* pPrevContext) {
// conn same
if(pContext->pConn == NULL) {
tInfo("PROBE rpcRid=0x%" PRIx64 " connect obj is NULL. ", rpcRid);
tInfo("PROBE rpcRid=0x%" PRIx64 " reqContext->pConn is NULL. The req is finished.", rpcRid);
if (pReqOver)
*pReqOver = true;
ret = true;
goto _END;
} else if (pContext->pConn != pContext->sendInfo.pConn) {
......@@ -1850,7 +1853,10 @@ bool rpcSendProbe(int64_t rpcRid, void* pPrevContext) {
}
// send syn
ret = doRpcSendProbe(pContext->pConn);
if (!doRpcSendProbe(pContext->pConn)) {
tError("PROBE rpcRid=0x%" PRIx64 " fd=%d rpc send probe data error.", rpcRid, fd);
}
ret = true;
_END:
// put back req context
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册