提交 b14514ae 编写于 作者: A Alex Duan

feat(rpc): add probe msg test ok

上级 a9c16769
...@@ -399,7 +399,6 @@ typedef struct SSqlObj { ...@@ -399,7 +399,6 @@ typedef struct SSqlObj {
int64_t self; int64_t self;
// connect alive // connect alive
int64_t lastProbe;
int64_t lastAlive; int64_t lastAlive;
void * pPrevContext; void * pPrevContext;
} SSqlObj; } SSqlObj;
......
...@@ -312,7 +312,11 @@ static void tscAsyncResultCallback(SSchedMsg *pMsg) { ...@@ -312,7 +312,11 @@ static void tscAsyncResultCallback(SSchedMsg *pMsg) {
return; return;
} }
assert(pSql->res.code != TSDB_CODE_SUCCESS); // probe send error , but result be responsed by server async
if(pSql->res.code == TSDB_CODE_SUCCESS) {
return ;
}
if (tsShortcutFlag) { if (tsShortcutFlag) {
tscDebug("0x%" PRIx64 " async result callback, code:%s", pSql->self, tstrerror(pSql->res.code)); tscDebug("0x%" PRIx64 " async result callback, code:%s", pSql->self, tstrerror(pSql->res.code));
pSql->res.code = TSDB_CODE_SUCCESS; pSql->res.code = TSDB_CODE_SUCCESS;
......
...@@ -281,48 +281,16 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { ...@@ -281,48 +281,16 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
} }
} }
// pSql connection link is broken
bool dealConnBroken(SSqlObj * pSql) {
// check valid
if (pSql->signature != pSql) {
tscInfo("PROBE 0x%" PRIx64 " break link signature is not equal pSql. signature=%p", pSql->self, pSql->signature);
return false;
}
// set error
pSql->res.code = TSDB_CODE_RPC_CONN_BROKEN;
// cancel
if (pSql->rpcRid > 0) {
tscInfo("PROBE 0x%" PRIx64 " break link done. rpcRid=0x%" PRIx64, pSql->self, pSql->rpcRid);
rpcCancelRequest(pSql->rpcRid);
pSql->rpcRid = -1;
} else {
tscInfo("PROBE 0x%" PRIx64 " break link rpcRid <=0. rpcRid=0x%" PRIx64, pSql->self, pSql->rpcRid);
}
// error notify
tscInfo("PROBE 0x%"PRIx64" call async result error." PRIx64, pSql->self);
tscAsyncResultOnError(pSql);
return true;
}
// if return true, send probe connection msg to sever ok // if return true, send probe connection msg to sever ok
bool sendProbeConnMsg(SSqlObj* pSql) { bool sendProbeConnMsg(SSqlObj* pSql, int64_t stime) {
// TEST TODO DELETE if(stime == 0) {
tsProbeSeconds = 1; // over this value send probe msg
tsProbeKillSeconds = 2*60; // over this value query can be killed
if(pSql->stime == 0) {
// not start , no need probe // not start , no need probe
tscInfo("PROBE 0x%" PRIx64 " not start, no need probe.", pSql->self); tscInfo("PROBE 0x%" PRIx64 " not start, no need probe.", pSql->self);
return true; return true;
} }
int64_t stime = MAX(pSql->stime, pSql->lastAlive); int64_t start = MAX(stime, pSql->lastAlive);
int32_t diff = (int32_t)(taosGetTimestampMs() - stime); int32_t diff = (int32_t)(taosGetTimestampMs() - start);
if (diff < tsProbeSeconds * 1000) { if (diff < tsProbeSeconds * 1000) {
// exec time short , need not probe alive // exec time short , need not probe alive
tscInfo("PROBE 0x%" PRIx64 " not arrived probe time. cfg timeout=%ds, no need probe. lastAlive=%" PRId64 " stime=%" PRId64, \ tscInfo("PROBE 0x%" PRIx64 " not arrived probe time. cfg timeout=%ds, no need probe. lastAlive=%" PRId64 " stime=%" PRId64, \
...@@ -349,9 +317,6 @@ bool sendProbeConnMsg(SSqlObj* pSql) { ...@@ -349,9 +317,6 @@ bool sendProbeConnMsg(SSqlObj* pSql) {
tscInfo("PROBE 0x%" PRIx64 " rpcRid is -1, response ok. no need probe.", pSql->self); tscInfo("PROBE 0x%" PRIx64 " rpcRid is -1, response ok. no need probe.", pSql->self);
return true; return true;
} }
// It's long time from lastAlive, so need probe
pSql->lastProbe = taosGetTimestampMs();
bool ret = rpcSendProbe(pSql->rpcRid, pSql->pPrevContext); bool ret = rpcSendProbe(pSql->rpcRid, pSql->pPrevContext);
tscInfo("PROBE 0x%" PRIx64 " send probe msg, ret=%d rpcRid=0x%" PRIx64, pSql->self, ret, pSql->rpcRid); tscInfo("PROBE 0x%" PRIx64 " send probe msg, ret=%d rpcRid=0x%" PRIx64, pSql->self, ret, pSql->rpcRid);
...@@ -363,27 +328,49 @@ void checkBrokenQueries(STscObj *pTscObj) { ...@@ -363,27 +328,49 @@ void checkBrokenQueries(STscObj *pTscObj) {
tscDebug("PROBE checkBrokenQueries pTscObj=%p pTscObj->rid=0x%" PRIx64, pTscObj, pTscObj->rid); tscDebug("PROBE checkBrokenQueries pTscObj=%p pTscObj->rid=0x%" PRIx64, pTscObj, pTscObj->rid);
SSqlObj *pSql = pTscObj->sqlList; SSqlObj *pSql = pTscObj->sqlList;
while (pSql) { while (pSql) {
// avoid sqlobj may not be correctly removed from sql list
if (pSql->sqlstr == NULL) {
pSql = pSql->next;
continue;
}
bool kill = false;
int32_t numOfSub = pSql->subState.numOfSub; int32_t numOfSub = pSql->subState.numOfSub;
tscInfo("PROBE 0x%" PRIx64 " check sql connection alive, numOfSub=%d sql=%s", pSql->self, numOfSub, pSql->sqlstr == NULL ? "" : pSql->sqlstr); tscInfo("PROBE 0x%" PRIx64 " start checking sql connection alive, numOfSub=%d sql=%s", pSql->self, numOfSub, pSql->sqlstr == NULL ? "" : pSql->sqlstr);
if (numOfSub == 0) { if (numOfSub == 0) {
// no sub sql // no sub sql
if(!sendProbeConnMsg(pSql)) { if(!sendProbeConnMsg(pSql, pSql->stime)) {
// send failed , connect already broken // need kill
dealConnBroken(pSql); tscInfo("PROBE 0x%" PRIx64 " need break link done. rpcRid=0x%" PRIx64, pSql->self, pSql->rpcRid);
kill = true;
} }
} else {
return ; // lock subs
} pthread_mutex_lock(&pSql->subState.mutex);
if (pSql->pSubs) {
// have sub sql // have sub sql
for (int i = 0; i < numOfSub; i++) { for (int i = 0; i < numOfSub; i++) {
SSqlObj *pSubSql = pSql->pSubs[i]; SSqlObj *pSubSql = pSql->pSubs[i];
if(!sendProbeConnMsg(pSubSql)) { if(pSubSql) {
// send failed , connect already broken tscInfo("PROBE 0x%" PRIx64 " sub sql app is 0x%" PRIx64, pSql->self, pSubSql->self);
dealConnBroken(pSubSql); if(!sendProbeConnMsg(pSubSql, pSql->stime)) {
// need kill
tscInfo("PROBE 0x%" PRIx64 " i=%d sub app=0x%" PRIx64 " need break link done. rpcRid=0x%" PRIx64, pSql->self, i, pSubSql->self, pSubSql->rpcRid);
kill = true;
break;
}
}
}
} }
// unlock
pthread_mutex_unlock(&pSql->subState.mutex);
} }
// kill query
if(kill) {
taos_stop_query(pSql);
}
// move next // move next
pSql = pSql->next; pSql = pSql->next;
} }
...@@ -543,12 +530,11 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -543,12 +530,11 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
// check msgtype // check msgtype
if(rpcMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) { if(rpcMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) {
pSql->lastAlive = taosGetTimestampMs(); pSql->lastAlive = taosGetTimestampMs();
tscDebug("PROBE 0x%" PRIx64 " recv probe response msg. rpcRid=0x%" PRIx64, pSql->self, pSql->rpcRid); tscInfo("PROBE 0x%" PRIx64 " recv probe msg response. rpcRid=0x%" PRIx64, pSql->self, pSql->rpcRid);
rpcFreeCont(rpcMsg->pCont); rpcFreeCont(rpcMsg->pCont);
return ; return ;
} }
STscObj *pObj = pSql->pTscObj; STscObj *pObj = pSql->pTscObj;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
......
...@@ -6368,9 +6368,9 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { ...@@ -6368,9 +6368,9 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
// TEST TODU DELETE // TEST TODU DELETE
static int loop = 0; //static int loop = 0;
taosMsleep(1*1000); //taosMsleep(1*1000);
qInfo(" loop=%d pEnv=%p", loop++, pRuntimeEnv); //qInfo(" loop=%d pEnv=%p", loop++, pRuntimeEnv);
if (pBlock == NULL) { if (pBlock == NULL) {
......
...@@ -1083,28 +1083,32 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { ...@@ -1083,28 +1083,32 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) {
// process probe msg , return true is probe msg, false is not probe msg // process probe msg , return true is probe msg, false is not probe msg
static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) { static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) {
SRpcHead *pHead = (SRpcHead *)pRecv->msg; SRpcHead *pHead = (SRpcHead *)pRecv->msg;
uint64_t ahandle = pHead->ahandle;
if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN) { if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN) {
// response to // response to
SRpcHead rspHead; char msg[RPC_MSG_OVERHEAD];
memset(&rspHead, 0, sizeof(SRpcHead)); SRpcHead *pRspHead;
rspHead.msgType = TSDB_MSG_TYPE_PROBE_CONN_RSP; // set msg header
rspHead.version = 1; memset(msg, 0, sizeof(SRpcHead));
rspHead.ahandle = pHead->ahandle; pRspHead = (SRpcHead *)msg;
rspHead.tranId = pHead->tranId;
rspHead.code = 0; pRspHead->msgType = TSDB_MSG_TYPE_PROBE_CONN_RSP;
rspHead.spi = pHead->spi; pRspHead->version = 1;
rspHead.linkUid = pHead->linkUid; pRspHead->ahandle = pHead->ahandle;
pRspHead->tranId = pHead->tranId;
pRspHead->code = 0;
pRspHead->linkUid = pHead->linkUid;
rpcLockConn(pConn); rpcLockConn(pConn);
rspHead.sourceId = pConn->ownId; pRspHead->sourceId = pConn->ownId;
rspHead.destId = pConn->peerId; pRspHead->destId = pConn->peerId;
memcpy(rspHead.user, pHead->user, tListLen(pHead->user)); memcpy(pRspHead->user, pHead->user, tListLen(pHead->user));
bool ret = rpcSendMsgToPeer(pConn, &rspHead, sizeof(SRpcHead)); bool ret = rpcSendMsgToPeer(pConn, pRspHead, sizeof(SRpcHead));
tInfo("PROBE 0x%" PRIx64 " recv probe msg and do response. ret=%d", pHead->ahandle, ret); tInfo("PROBE 0x%" PRIx64 " recv probe msg and do response. ret=%d", ahandle, ret);
rpcFreeMsg(pRecv->msg);
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
rpcFreeMsg(pRecv->msg);
} else if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) { } else if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) {
if(pConn) { if(pConn) {
rpcLockConn(pConn); rpcLockConn(pConn);
...@@ -1116,16 +1120,16 @@ static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) { ...@@ -1116,16 +1120,16 @@ static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) {
rpcProcessIncomingMsg(pConn, pHead, pContext); rpcProcessIncomingMsg(pConn, pHead, pContext);
taosReleaseRef(tsRpcRefId, pConn->rid); taosReleaseRef(tsRpcRefId, pConn->rid);
} else { } else {
tInfo("PROBE 0x%" PRIx64 " get reqContext by rid return NULL. pConn->rid=0x%" PRIX64, pHead->ahandle, pConn->rid); tInfo("PROBE 0x%" PRIx64 " recv response probe msg but pContext is NULL. pConn->rid=0x%" PRIX64, ahandle, pConn->rid);
rpcFreeMsg(pRecv->msg);
} }
rpcUnlockConn(pConn); rpcUnlockConn(pConn);
tInfo("PROBE 0x%" PRIx64 " recv response probe msg and update lastLiveTime. pConn=%p", pHead->ahandle, pConn);
} else { } else {
tInfo("PROBE 0x%" PRIx64 " recv response probe msg but pConn is NULL.", pHead->ahandle); tInfo("PROBE 0x%" PRIx64 " recv response probe msg but pConn is NULL.", ahandle);
rpcFreeMsg(pRecv->msg);
} }
} }
} }
static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
...@@ -1798,7 +1802,7 @@ bool rpcSendProbe(int64_t rpcRid, void* pPrevContext) { ...@@ -1798,7 +1802,7 @@ bool rpcSendProbe(int64_t rpcRid, void* pPrevContext) {
bool ret = false; bool ret = false;
if(rpcRid < 0) { if(rpcRid < 0) {
tError("PROBE rpcRid=0x%" PRIx64 " less than zero, invalid.", rpcRid); tError("PROBE rpcRid=0x%" PRIx64 " less than zero, invalid.", rpcRid);
return false; return true;
} }
// get req content // get req content
...@@ -1815,22 +1819,25 @@ bool rpcSendProbe(int64_t rpcRid, void* pPrevContext) { ...@@ -1815,22 +1819,25 @@ bool rpcSendProbe(int64_t rpcRid, void* pPrevContext) {
} }
// conn same // conn same
if (pContext->pConn != pContext->sendInfo.pConn) { if(pContext->pConn == NULL) {
tError("PROBE rpcRid=0x%" PRIx64 " connect obj diff. pContext->pConn=%p pPreConn=%p", rpcRid, pContext->pConn, pContext->sendInfo.pConn); tInfo("PROBE rpcRid=0x%" PRIx64 " connect obj is NULL. ", rpcRid);
ret = pContext->pConn == NULL; ret = true;
goto _END;
} else if (pContext->pConn != pContext->sendInfo.pConn) {
tInfo("PROBE rpcRid=0x%" PRIx64 " connect obj diff. pContext->pConn=%p pPreConn=%p", rpcRid, pContext->pConn, pContext->sendInfo.pConn);
goto _END; goto _END;
} }
// fdObj same // fdObj same
if (pContext->pConn->chandle != pContext->sendInfo.pFdObj) { if (pContext->pConn->chandle != pContext->sendInfo.pFdObj) {
tError("PROBE rpcRid=0x%" PRIx64 " connect fdObj diff. pContext->pConn->chandle=%p pPrevFdObj=%p", rpcRid, pContext->pConn->chandle, pContext->sendInfo.pFdObj); tInfo("PROBE rpcRid=0x%" PRIx64 " connect fdObj diff. pContext->pConn->chandle=%p pPrevFdObj=%p", rpcRid, pContext->pConn->chandle, pContext->sendInfo.pFdObj);
goto _END; goto _END;
} }
// fd same // fd same
int32_t fd = taosGetFdID(pContext->pConn->chandle); int32_t fd = taosGetFdID(pContext->pConn->chandle);
if (fd != pContext->sendInfo.fd) { if (fd != pContext->sendInfo.fd) {
tError("PROBE rpcRid=0x%" PRIx64 " connect fd diff.fd=%d prevFd=%d", rpcRid, fd, pContext->sendInfo.fd); tInfo("PROBE rpcRid=0x%" PRIx64 " connect fd diff.fd=%d prevFd=%d", rpcRid, fd, pContext->sendInfo.fd);
goto _END; goto _END;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册