提交 9f20bf05 编写于 作者: A Alex Duan

feat(rpc): probe add complete log

上级 19bf81a4
...@@ -401,7 +401,6 @@ typedef struct SSqlObj { ...@@ -401,7 +401,6 @@ typedef struct SSqlObj {
// connect alive // connect alive
int64_t lastProbe; int64_t lastProbe;
int64_t lastAlive; int64_t lastAlive;
char noAckCnt; // no recevie ack from sever count
void * pPrevContext; void * pPrevContext;
void * pPrevConn; void * pPrevConn;
void * pPrevFdObj; void * pPrevFdObj;
......
...@@ -284,10 +284,9 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -284,10 +284,9 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
// pSql connection link is broken // pSql connection link is broken
bool dealConnBroken(SSqlObj * pSql) { bool dealConnBroken(SSqlObj * pSql) {
// check valid // check valid
if (pSql == NULL || pSql->signature != pSql) {
return false; if (pSql->signature != pSql) {
} tscInfo("PROBE 0x%" PRIx64 " break link signature is not equal pSql. signature=%p", pSql->self, pSql->signature);
if (pSql->cmd.command >= TSDB_SQL_LOCAL) {
return false; return false;
} }
...@@ -296,13 +295,15 @@ bool dealConnBroken(SSqlObj * pSql) { ...@@ -296,13 +295,15 @@ bool dealConnBroken(SSqlObj * pSql) {
// cancel // cancel
if (pSql->rpcRid > 0) { if (pSql->rpcRid > 0) {
tscDebug("PROBE 0x%" PRIx64 " rpc cancel request rpcRid=0x%" PRIx64 ".", pSql->self, pSql->rpcRid); tscInfo("PROBE 0x%" PRIx64 " break link done. rpcRid=0x%" PRIx64, pSql->self, pSql->rpcRid);
rpcCancelRequest(pSql->rpcRid); rpcCancelRequest(pSql->rpcRid);
pSql->rpcRid = -1; pSql->rpcRid = -1;
} else {
tscInfo("PROBE 0x%" PRIx64 " break link rpcRid <=0. rpcRid=0x%" PRIx64, pSql->self, pSql->rpcRid);
} }
// error // error notify
tscDebug("PROBE 0x%"PRIx64" async result error.", pSql->self); tscInfo("PROBE 0x%"PRIx64" async result error. rpcRid=0x%" PRIx64, pSql->self, pSql->rpcRid);
tscAsyncResultOnError(pSql); tscAsyncResultOnError(pSql);
return true; return true;
...@@ -316,6 +317,7 @@ bool sendProbeConnMsg(SSqlObj* pSql) { ...@@ -316,6 +317,7 @@ bool sendProbeConnMsg(SSqlObj* pSql) {
if(pSql->stime == 0) { if(pSql->stime == 0) {
// not start , no need probe // not start , no need probe
tscInfo("PROBE 0x%" PRIx64 " not start, no need probe.", pSql->self);
return true; return true;
} }
...@@ -323,39 +325,47 @@ bool sendProbeConnMsg(SSqlObj* pSql) { ...@@ -323,39 +325,47 @@ bool sendProbeConnMsg(SSqlObj* pSql) {
int32_t diff = (int32_t)(taosGetTimestampMs() - stime); int32_t diff = (int32_t)(taosGetTimestampMs() - stime);
if (diff < tsProbeSeconds * 1000) { if (diff < tsProbeSeconds * 1000) {
// exec time short , need not probe alive // exec time short , need not probe alive
tscInfo("PROBE 0x%" PRIx64 "not arrived probe time. timeout=%ds, no need probe. lastAlive=%" PRId64 " stime=%" PRId64, \
pSql->self, tsProbeSeconds, pSql->lastAlive, pSql->stime);
return true; return true;
} }
if (diff > tsProbeKillSeconds * 1000) { if (diff > tsProbeKillSeconds * 1000) {
// need kill query // need kill query
tscDebug("PROBE 0x%"PRIx64" need killed, noAckCnt:%d diff=%d", pSql->self, pSql->noAckCnt, diff); tscInfo("PROBE 0x%" PRIx64 "kill query by probe. because arrived kill time. timeout=%ds lastAlive=%" PRId64 " stime=%" PRId64, \
pSql->self, tsProbeKillSeconds, pSql->lastAlive, pSql->stime);
return false; return false;
} }
if (pSql->pPrevContext == NULL || pSql->pPrevConn == NULL || pSql->pPrevFdObj == NULL || pSql->prevFd <= 0) { if (pSql->pPrevContext == NULL || pSql->pPrevConn == NULL || pSql->pPrevFdObj == NULL || pSql->prevFd <= 0) {
// last connect info save uncompletely, so can't probe // last connect info save uncompletely, so can't probe
tscInfo("PROBE 0x%" PRIx64 "save last connect info uncompletely. prev context=%p conn=%p fdobj=%p fd=%d", \
pSql->self, pSql->pPrevContext, pSql->pPrevConn, pSql->pPrevFdObj, pSql->prevFd);
return true; return true;
} }
if(pSql->rpcRid == -1) { if(pSql->rpcRid == -1) {
// cancel or reponse ok from server, so need not probe // cancel or reponse ok from server, so need not probe
tscInfo("PROBE 0x%" PRIx64 " rpcRid is -1, response ok. no need probe.", pSql->self);
return true; return true;
} }
// It's long time from lastAlive, so need probe // It's long time from lastAlive, so need probe
pSql->noAckCnt++;
pSql->lastProbe = taosGetTimestampMs(); pSql->lastProbe = taosGetTimestampMs();
tscDebug("0x%"PRIx64" sendProbeConnMsg noAckCnt:%d diff=%d", pSql->self, pSql->noAckCnt, diff);
bool ret = rpcSendProbe(pSql->rpcRid, pSql->pPrevContext, pSql->pPrevConn, pSql->pPrevFdObj, pSql->prevFd);
return rpcSendProbe(pSql->rpcRid, pSql->pPrevContext, pSql->pPrevConn, pSql->pPrevFdObj, pSql->prevFd); tscInfo("PROBE 0x%" PRIx64 " rpcRid=0x%" PRIx64 " send probe msg, ret=%d", pSql->self, pSql->rpcRid, ret);
return ret;
} }
// check have broken link queries than killed // check have broken link queries than killed
void checkBrokenQueries(STscObj *pTscObj) { void checkBrokenQueries(STscObj *pTscObj) {
// tscDebug("PROBE checkBrokenQueries pTscObj=%p pTscObj->rid=0x%" PRIx64, pTscObj, pTscObj->rid);
SSqlObj *pSql = pTscObj->sqlList; SSqlObj *pSql = pTscObj->sqlList;
while (pSql) { while (pSql) {
int32_t numOfSub = pSql->subState.numOfSub; int32_t numOfSub = pSql->subState.numOfSub;
tscInfo("PROBE 0x%" PRIx64 " numOfSub=%d sql=%s", pSql->self, numOfSub, pSql->sqlstr == NULL ? "" : pSql->sqlstr);
if (numOfSub == 0) { if (numOfSub == 0) {
// no sub sql // no sub sql
if(!sendProbeConnMsg(pSql)) { if(!sendProbeConnMsg(pSql)) {
...@@ -396,9 +406,9 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { ...@@ -396,9 +406,9 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
assert(pHB->self == pObj->hbrid); assert(pHB->self == pObj->hbrid);
// check queries already death // check queries already death
static int activetyTimerCnt = 0; static int activetyCnt = 0;
if (++activetyTimerCnt > 3) { // 1.5s * 10 = 15s interval call if (++activetyCnt > 3) { // 1.5s * 10 = 15s interval call
activetyTimerCnt = 0; activetyCnt = 0;
// call check if have query doing // call check if have query doing
if(pObj->sqlList) { if(pObj->sqlList) {
...@@ -532,9 +542,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -532,9 +542,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
// check msgtype // check msgtype
if(rpcMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) { if(rpcMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) {
pSql->noAckCnt = 0;
pSql->lastAlive = taosGetTimestampMs(); pSql->lastAlive = taosGetTimestampMs();
tscDebug("PROBE 0x%" PRIx64 " recv probe msg. sql=%s", pSql->self, pSql->sqlstr); tscDebug("PROBE 0x%" PRIx64 " recv probe response msg. rpcRid=0x%" PRIx64, pSql->self, pSql->rpcRid);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
return ; return ;
} }
......
...@@ -6368,7 +6368,9 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { ...@@ -6368,7 +6368,9 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
// TEST TODU DELETE // TEST TODU DELETE
taosMsleep(10*1000); static int loop = 0;
taosMsleep(3*1000);
qInfo(" loop=%d pEnv=%p", loop++, pRuntimeEnv);
if (pBlock == NULL) { if (pBlock == NULL) {
......
...@@ -1101,7 +1101,7 @@ static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) { ...@@ -1101,7 +1101,7 @@ static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) {
memcpy(rspHead.user, pHead->user, tListLen(pHead->user)); memcpy(rspHead.user, pHead->user, tListLen(pHead->user));
bool ret = rpcSendMsgToPeer(pConn, &rspHead, sizeof(SRpcHead)); bool ret = rpcSendMsgToPeer(pConn, &rspHead, sizeof(SRpcHead));
tDebug("PROBE 0x%" PRIx64 " recv probe msg and response. ret=%d", pHead->ahandle, ret); tInfo("PROBE 0x%" PRIx64 " recv probe msg and do response. ret=%d", pHead->ahandle, ret);
rpcFreeMsg(pRecv->msg); rpcFreeMsg(pRecv->msg);
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
} else if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) { } else if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) {
...@@ -1115,12 +1115,14 @@ static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) { ...@@ -1115,12 +1115,14 @@ static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) {
rpcProcessIncomingMsg(pConn, pHead, pContext); rpcProcessIncomingMsg(pConn, pHead, pContext);
taosReleaseRef(tsRpcRefId, pConn->rid); taosReleaseRef(tsRpcRefId, pConn->rid);
} else { } else {
tError("PROBE recv probe msg get context is NULL. rid=%" PRId64 " NULL.", pConn->rid); tInfo("PROBE 0x%" PRIx64 " get reqContext by rid return NULL. pConn->rid=0x%" PRIX64, pHead->ahandle, pConn->rid);
} }
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
tInfo("PROBE 0x%" PRIx64 " recv response probe msg and update lastLiveTime. pConn=%p", pHead->ahandle, pConn);
} else {
tInfo("PROBE 0x%" PRIx64 " recv response probe msg but pConn is NULL.", pHead->ahandle);
} }
tDebug("PROBE 0x%" PRIx64 " recv response probe msg and update lastLiveTime. pConn=%p", pHead->ahandle, pConn);
} }
} }
...@@ -1789,46 +1791,46 @@ bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, void* pPrevConn, void* pPr ...@@ -1789,46 +1791,46 @@ bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, void* pPrevConn, void* pPr
// return false can kill query // return false can kill query
bool ret = false; bool ret = false;
if(rpcRid < 0) { if(rpcRid < 0) {
tError("PROBE rpcRid=%" PRId64 " less than zero, invalid.", rpcRid); tError("PROBE rpcRid=0x%" PRIx64 " less than zero, invalid.", rpcRid);
return false; return false;
} }
// get req content // get req content
SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rpcRid); SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rpcRid);
if (pContext == NULL) { if (pContext == NULL) {
tError("PROBE rpcRid=%" PRId64 " get context NULL. sql finished no need send probe.", rpcRid); tError("PROBE rpcRid=0x%" PRIx64 " get context NULL. sql finished no need send probe.", rpcRid);
return true; return true;
} }
// context same // context same
if(pContext != pPrevContext) { if(pContext != pPrevContext) {
tError("PROBE rpcRid=%" PRId64 " context diff. pContext=%p pPreContent=%p", rpcRid, pContext, pPrevContext); tError("PROBE rpcRid=0x%" PRIx64 " context diff. pContext=%p pPreContent=%p", rpcRid, pContext, pPrevContext);
goto _END; goto _END;
} }
// conn same // conn same
if (pContext->pConn != pPrevConn) { if (pContext->pConn != pPrevConn) {
tError("PROBE rpcRid=%" PRId64 " connect obj diff. pContext->pConn=%p pPreConn=%p", rpcRid, pContext->pConn, pPrevConn); tError("PROBE rpcRid=0x%" PRIx64 " connect obj diff. pContext->pConn=%p pPreConn=%p", rpcRid, pContext->pConn, pPrevConn);
ret = pContext->pConn == NULL; ret = pContext->pConn == NULL;
goto _END; goto _END;
} }
// fdObj same // fdObj same
if (pContext->pConn->chandle != pPrevFdObj) { if (pContext->pConn->chandle != pPrevFdObj) {
tError("PROBE rpcRid=%" PRId64 " connect fdObj diff. pContext->pConn->chandle=%p pPrevFdObj=%p", rpcRid, pContext->pConn->chandle, pPrevFdObj); tError("PROBE rpcRid=0x%" PRIx64 " connect fdObj diff. pContext->pConn->chandle=%p pPrevFdObj=%p", rpcRid, pContext->pConn->chandle, pPrevFdObj);
goto _END; goto _END;
} }
// fd same // fd same
int32_t fd = taosGetFdID(pContext->pConn->chandle); int32_t fd = taosGetFdID(pContext->pConn->chandle);
if (fd != prevFd) { if (fd != prevFd) {
tError("PROBE rpcRid=%" PRId64 " connect fd diff.fd=%d prevFd=%d", rpcRid, fd, prevFd); tError("PROBE rpcRid=0x%" PRIx64 " connect fd diff.fd=%d prevFd=%d", rpcRid, fd, prevFd);
goto _END; goto _END;
} }
// send syn // send syn
ret = doRpcSendProbe(pContext->pConn); ret = doRpcSendProbe(pContext->pConn);
tInfo("PROBE 0x%" PRIx64 " rpcRid=%" PRId64 " send data ret=%d fd=%d.", (int64_t)pContext->ahandle, rpcRid, ret, fd); tInfo("PROBE 0x%" PRIx64 " rrpcRid=0x%" PRIx64 " send data ret=%d fd=%d.", (int64_t)pContext->ahandle, rpcRid, ret, fd);
_END: _END:
// put back req context // put back req context
...@@ -1839,13 +1841,13 @@ _END: ...@@ -1839,13 +1841,13 @@ _END:
// after sql request send , save conn info // after sql request send , save conn info
bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext, void** ppConn, void** ppFdObj, int32_t* pFd) { bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext, void** ppConn, void** ppFdObj, int32_t* pFd) {
if(rpcRid < 0) { if(rpcRid < 0) {
tError("PROBE saveSendInfo rpcRid=%" PRId64 " less than zero, invalid.", rpcRid); tError("PROBE saveSendInfo rpcRid=0x%" PRIx64 " less than zero, invalid.", rpcRid);
return false; return false;
} }
// get req content // get req content
SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rpcRid); SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rpcRid);
if (pContext == NULL) { if (pContext == NULL) {
tError("PROBE rpcRid=%" PRId64 " get context NULL.", rpcRid); tError("PROBE saveSendInfo rpcRid=0x%" PRIx64 " get context NULL.", rpcRid);
return false; return false;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册