diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 88d1916724d3762682103aaf65ff1901a517e6fc..5a36256538a3e2b9325300bbce39e9d31f685515 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -399,7 +399,6 @@ typedef struct SSqlObj { int64_t self; // connect alive - int64_t lastProbe; int64_t lastAlive; void * pPrevContext; } SSqlObj; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index df1b98478c2244b15b0170e310565d8a6cddc314..759a301f2ab1a617789388f9192be7f7a196d605 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -312,7 +312,11 @@ static void tscAsyncResultCallback(SSchedMsg *pMsg) { return; } - assert(pSql->res.code != TSDB_CODE_SUCCESS); + // probe send error , but result be responsed by server async + if(pSql->res.code == TSDB_CODE_SUCCESS) { + return ; + } + if (tsShortcutFlag) { tscDebug("0x%" PRIx64 " async result callback, code:%s", pSql->self, tstrerror(pSql->res.code)); pSql->res.code = TSDB_CODE_SUCCESS; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 0d3471126cc7e087ea4f5430b21a026f2a7ba92b..552dee08ee77779ef97ac648e6ab2b6cf08339c1 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -281,48 +281,16 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { } } -// pSql connection link is broken -bool dealConnBroken(SSqlObj * pSql) { - // check valid - - if (pSql->signature != pSql) { - tscInfo("PROBE 0x%" PRIx64 " break link signature is not equal pSql. signature=%p", pSql->self, pSql->signature); - return false; - } - - // set error - pSql->res.code = TSDB_CODE_RPC_CONN_BROKEN; - - // cancel - if (pSql->rpcRid > 0) { - tscInfo("PROBE 0x%" PRIx64 " break link done. rpcRid=0x%" PRIx64, pSql->self, pSql->rpcRid); - rpcCancelRequest(pSql->rpcRid); - pSql->rpcRid = -1; - } else { - tscInfo("PROBE 0x%" PRIx64 " break link rpcRid <=0. rpcRid=0x%" PRIx64, pSql->self, pSql->rpcRid); - } - - // error notify - tscInfo("PROBE 0x%"PRIx64" call async result error." PRIx64, pSql->self); - tscAsyncResultOnError(pSql); - - return true; -} - // if return true, send probe connection msg to sever ok -bool sendProbeConnMsg(SSqlObj* pSql) { - // TEST TODO DELETE - tsProbeSeconds = 1; // over this value send probe msg - tsProbeKillSeconds = 2*60; // over this value query can be killed - - if(pSql->stime == 0) { +bool sendProbeConnMsg(SSqlObj* pSql, int64_t stime) { + if(stime == 0) { // not start , no need probe tscInfo("PROBE 0x%" PRIx64 " not start, no need probe.", pSql->self); return true; } - int64_t stime = MAX(pSql->stime, pSql->lastAlive); - int32_t diff = (int32_t)(taosGetTimestampMs() - stime); + int64_t start = MAX(stime, pSql->lastAlive); + int32_t diff = (int32_t)(taosGetTimestampMs() - start); if (diff < tsProbeSeconds * 1000) { // exec time short , need not probe alive tscInfo("PROBE 0x%" PRIx64 " not arrived probe time. cfg timeout=%ds, no need probe. lastAlive=%" PRId64 " stime=%" PRId64, \ @@ -349,9 +317,6 @@ bool sendProbeConnMsg(SSqlObj* pSql) { tscInfo("PROBE 0x%" PRIx64 " rpcRid is -1, response ok. no need probe.", pSql->self); return true; } - - // It's long time from lastAlive, so need probe - pSql->lastProbe = taosGetTimestampMs(); bool ret = rpcSendProbe(pSql->rpcRid, pSql->pPrevContext); tscInfo("PROBE 0x%" PRIx64 " send probe msg, ret=%d rpcRid=0x%" PRIx64, pSql->self, ret, pSql->rpcRid); @@ -363,27 +328,49 @@ void checkBrokenQueries(STscObj *pTscObj) { tscDebug("PROBE checkBrokenQueries pTscObj=%p pTscObj->rid=0x%" PRIx64, pTscObj, pTscObj->rid); SSqlObj *pSql = pTscObj->sqlList; while (pSql) { + // avoid sqlobj may not be correctly removed from sql list + if (pSql->sqlstr == NULL) { + pSql = pSql->next; + continue; + } + + bool kill = false; int32_t numOfSub = pSql->subState.numOfSub; - tscInfo("PROBE 0x%" PRIx64 " check sql connection alive, numOfSub=%d sql=%s", pSql->self, numOfSub, pSql->sqlstr == NULL ? "" : pSql->sqlstr); + tscInfo("PROBE 0x%" PRIx64 " start checking sql connection alive, numOfSub=%d sql=%s", pSql->self, numOfSub, pSql->sqlstr == NULL ? "" : pSql->sqlstr); if (numOfSub == 0) { // no sub sql - if(!sendProbeConnMsg(pSql)) { - // send failed , connect already broken - dealConnBroken(pSql); + if(!sendProbeConnMsg(pSql, pSql->stime)) { + // need kill + tscInfo("PROBE 0x%" PRIx64 " need break link done. rpcRid=0x%" PRIx64, pSql->self, pSql->rpcRid); + kill = true; } - - return ; - } - - // have sub sql - for (int i = 0; i < numOfSub; i++) { - SSqlObj *pSubSql = pSql->pSubs[i]; - if(!sendProbeConnMsg(pSubSql)) { - // send failed , connect already broken - dealConnBroken(pSubSql); + } else { + // lock subs + pthread_mutex_lock(&pSql->subState.mutex); + if (pSql->pSubs) { + // have sub sql + for (int i = 0; i < numOfSub; i++) { + SSqlObj *pSubSql = pSql->pSubs[i]; + if(pSubSql) { + tscInfo("PROBE 0x%" PRIx64 " sub sql app is 0x%" PRIx64, pSql->self, pSubSql->self); + if(!sendProbeConnMsg(pSubSql, pSql->stime)) { + // need kill + tscInfo("PROBE 0x%" PRIx64 " i=%d sub app=0x%" PRIx64 " need break link done. rpcRid=0x%" PRIx64, pSql->self, i, pSubSql->self, pSubSql->rpcRid); + kill = true; + break; + } + } + } } + // unlock + pthread_mutex_unlock(&pSql->subState.mutex); } + // kill query + if(kill) { + taos_stop_query(pSql); + } + // move next pSql = pSql->next; } @@ -543,12 +530,11 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { // check msgtype if(rpcMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) { pSql->lastAlive = taosGetTimestampMs(); - tscDebug("PROBE 0x%" PRIx64 " recv probe response msg. rpcRid=0x%" PRIx64, pSql->self, pSql->rpcRid); + tscInfo("PROBE 0x%" PRIx64 " recv probe msg response. rpcRid=0x%" PRIx64, pSql->self, pSql->rpcRid); rpcFreeCont(rpcMsg->pCont); return ; } - STscObj *pObj = pSql->pTscObj; SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 6f7e77d91501e349d9a8fe8dd5e7939d9e375b00..af859de8a1fa95fe6565380aca949bdc4f51e2f6 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -6368,9 +6368,9 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); // TEST TODU DELETE - static int loop = 0; - taosMsleep(1*1000); - qInfo(" loop=%d pEnv=%p", loop++, pRuntimeEnv); + //static int loop = 0; + //taosMsleep(1*1000); + //qInfo(" loop=%d pEnv=%p", loop++, pRuntimeEnv); if (pBlock == NULL) { diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index e8564ac8edae857d98cf3d370c86592da9ed45ab..6a8a47c69a949296dfd92feeaa514314dc265168 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -1083,28 +1083,32 @@ static void rpcProcessBrokenLink(SRpcConn *pConn) { // process probe msg , return true is probe msg, false is not probe msg static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) { SRpcHead *pHead = (SRpcHead *)pRecv->msg; + uint64_t ahandle = pHead->ahandle; if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN) { // response to - SRpcHead rspHead; - memset(&rspHead, 0, sizeof(SRpcHead)); + char msg[RPC_MSG_OVERHEAD]; + SRpcHead *pRspHead; - rspHead.msgType = TSDB_MSG_TYPE_PROBE_CONN_RSP; - rspHead.version = 1; - rspHead.ahandle = pHead->ahandle; - rspHead.tranId = pHead->tranId; - rspHead.code = 0; - rspHead.spi = pHead->spi; - rspHead.linkUid = pHead->linkUid; + // set msg header + memset(msg, 0, sizeof(SRpcHead)); + pRspHead = (SRpcHead *)msg; + + pRspHead->msgType = TSDB_MSG_TYPE_PROBE_CONN_RSP; + pRspHead->version = 1; + pRspHead->ahandle = pHead->ahandle; + pRspHead->tranId = pHead->tranId; + pRspHead->code = 0; + pRspHead->linkUid = pHead->linkUid; rpcLockConn(pConn); - rspHead.sourceId = pConn->ownId; - rspHead.destId = pConn->peerId; - memcpy(rspHead.user, pHead->user, tListLen(pHead->user)); + pRspHead->sourceId = pConn->ownId; + pRspHead->destId = pConn->peerId; + memcpy(pRspHead->user, pHead->user, tListLen(pHead->user)); - bool ret = rpcSendMsgToPeer(pConn, &rspHead, sizeof(SRpcHead)); - tInfo("PROBE 0x%" PRIx64 " recv probe msg and do response. ret=%d", pHead->ahandle, ret); - rpcFreeMsg(pRecv->msg); + bool ret = rpcSendMsgToPeer(pConn, pRspHead, sizeof(SRpcHead)); + tInfo("PROBE 0x%" PRIx64 " recv probe msg and do response. ret=%d", ahandle, ret); rpcUnlockConn(pConn); + rpcFreeMsg(pRecv->msg); } else if (pHead->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) { if(pConn) { rpcLockConn(pConn); @@ -1116,16 +1120,16 @@ static void rpcProcessProbeMsg(SRecvInfo *pRecv, SRpcConn *pConn) { rpcProcessIncomingMsg(pConn, pHead, pContext); taosReleaseRef(tsRpcRefId, pConn->rid); } else { - tInfo("PROBE 0x%" PRIx64 " get reqContext by rid return NULL. pConn->rid=0x%" PRIX64, pHead->ahandle, pConn->rid); + tInfo("PROBE 0x%" PRIx64 " recv response probe msg but pContext is NULL. pConn->rid=0x%" PRIX64, ahandle, pConn->rid); + rpcFreeMsg(pRecv->msg); } rpcUnlockConn(pConn); - tInfo("PROBE 0x%" PRIx64 " recv response probe msg and update lastLiveTime. pConn=%p", pHead->ahandle, pConn); } else { - tInfo("PROBE 0x%" PRIx64 " recv response probe msg but pConn is NULL.", pHead->ahandle); + tInfo("PROBE 0x%" PRIx64 " recv response probe msg but pConn is NULL.", ahandle); + rpcFreeMsg(pRecv->msg); } } - } static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) { @@ -1798,7 +1802,7 @@ bool rpcSendProbe(int64_t rpcRid, void* pPrevContext) { bool ret = false; if(rpcRid < 0) { tError("PROBE rpcRid=0x%" PRIx64 " less than zero, invalid.", rpcRid); - return false; + return true; } // get req content @@ -1815,22 +1819,25 @@ bool rpcSendProbe(int64_t rpcRid, void* pPrevContext) { } // conn same - if (pContext->pConn != pContext->sendInfo.pConn) { - tError("PROBE rpcRid=0x%" PRIx64 " connect obj diff. pContext->pConn=%p pPreConn=%p", rpcRid, pContext->pConn, pContext->sendInfo.pConn); - ret = pContext->pConn == NULL; + if(pContext->pConn == NULL) { + tInfo("PROBE rpcRid=0x%" PRIx64 " connect obj is NULL. ", rpcRid); + ret = true; + goto _END; + } else if (pContext->pConn != pContext->sendInfo.pConn) { + tInfo("PROBE rpcRid=0x%" PRIx64 " connect obj diff. pContext->pConn=%p pPreConn=%p", rpcRid, pContext->pConn, pContext->sendInfo.pConn); goto _END; } // fdObj same if (pContext->pConn->chandle != pContext->sendInfo.pFdObj) { - tError("PROBE rpcRid=0x%" PRIx64 " connect fdObj diff. pContext->pConn->chandle=%p pPrevFdObj=%p", rpcRid, pContext->pConn->chandle, pContext->sendInfo.pFdObj); + tInfo("PROBE rpcRid=0x%" PRIx64 " connect fdObj diff. pContext->pConn->chandle=%p pPrevFdObj=%p", rpcRid, pContext->pConn->chandle, pContext->sendInfo.pFdObj); goto _END; } // fd same int32_t fd = taosGetFdID(pContext->pConn->chandle); if (fd != pContext->sendInfo.fd) { - tError("PROBE rpcRid=0x%" PRIx64 " connect fd diff.fd=%d prevFd=%d", rpcRid, fd, pContext->sendInfo.fd); + tInfo("PROBE rpcRid=0x%" PRIx64 " connect fd diff.fd=%d prevFd=%d", rpcRid, fd, pContext->sendInfo.fd); goto _END; }