From a9c16769170e162bc70e16b6e6d16f382ce45b66 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Sun, 31 Jul 2022 21:31:04 +0800 Subject: [PATCH] feat(rpc): add probe msg --- src/client/inc/tsclient.h | 3 --- src/client/src/tscServer.c | 22 +++++++++++----------- src/inc/trpc.h | 11 +++++++++-- src/query/src/qExecutor.c | 2 +- src/rpc/src/rpcMain.c | 33 ++++++++++++++------------------- 5 files changed, 35 insertions(+), 36 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 3e9ad0699a..88d1916724 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -402,9 +402,6 @@ typedef struct SSqlObj { int64_t lastProbe; int64_t lastAlive; void * pPrevContext; - void * pPrevConn; - void * pPrevFdObj; - int32_t prevFd; } SSqlObj; typedef struct SSqlStream { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 84fe8e31d8..0d3471126c 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -303,7 +303,7 @@ bool dealConnBroken(SSqlObj * pSql) { } // error notify - tscInfo("PROBE 0x%"PRIx64" async result error. rpcRid=0x%" PRIx64, pSql->self, pSql->rpcRid); + tscInfo("PROBE 0x%"PRIx64" call async result error." PRIx64, pSql->self); tscAsyncResultOnError(pSql); return true; @@ -325,23 +325,22 @@ bool sendProbeConnMsg(SSqlObj* pSql) { int32_t diff = (int32_t)(taosGetTimestampMs() - stime); if (diff < tsProbeSeconds * 1000) { // exec time short , need not probe alive - tscInfo("PROBE 0x%" PRIx64 "not arrived probe time. timeout=%ds, no need probe. lastAlive=%" PRId64 " stime=%" PRId64, \ + tscInfo("PROBE 0x%" PRIx64 " not arrived probe time. cfg timeout=%ds, no need probe. lastAlive=%" PRId64 " stime=%" PRId64, \ pSql->self, tsProbeSeconds, pSql->lastAlive, pSql->stime); return true; } if (diff > tsProbeKillSeconds * 1000) { // need kill query - tscInfo("PROBE 0x%" PRIx64 "kill query by probe. because arrived kill time. timeout=%ds lastAlive=%" PRId64 " stime=%" PRId64, \ - pSql->self, tsProbeKillSeconds, pSql->lastAlive, pSql->stime); + tscInfo("PROBE 0x%" PRIx64 " kill query by probe. because arrived kill time. time=%ds cfg timeout=%ds lastAlive=%" PRId64 " stime=%" PRId64, \ + pSql->self, diff/1000, tsProbeKillSeconds, pSql->lastAlive, pSql->stime); return false; } - if (pSql->pPrevContext == NULL || pSql->pPrevConn == NULL || pSql->pPrevFdObj == NULL || pSql->prevFd <= 0) { + if (pSql->pPrevContext == NULL) { // last connect info save uncompletely, so can't probe - tscInfo("PROBE 0x%" PRIx64 "save last connect info uncompletely. prev context=%p conn=%p fdobj=%p fd=%d", \ - pSql->self, pSql->pPrevContext, pSql->pPrevConn, pSql->pPrevFdObj, pSql->prevFd); + tscInfo("PROBE 0x%" PRIx64 " save last connect info uncompletely. prev context is null", pSql->self); return true; } @@ -354,8 +353,8 @@ bool sendProbeConnMsg(SSqlObj* pSql) { // It's long time from lastAlive, so need probe pSql->lastProbe = taosGetTimestampMs(); - bool ret = rpcSendProbe(pSql->rpcRid, pSql->pPrevContext, pSql->pPrevConn, pSql->pPrevFdObj, pSql->prevFd); - tscInfo("PROBE 0x%" PRIx64 " rpcRid=0x%" PRIx64 " send probe msg, ret=%d", pSql->self, pSql->rpcRid, ret); + bool ret = rpcSendProbe(pSql->rpcRid, pSql->pPrevContext); + tscInfo("PROBE 0x%" PRIx64 " send probe msg, ret=%d rpcRid=0x%" PRIx64, pSql->self, ret, pSql->rpcRid); return ret; } @@ -365,7 +364,7 @@ void checkBrokenQueries(STscObj *pTscObj) { SSqlObj *pSql = pTscObj->sqlList; while (pSql) { int32_t numOfSub = pSql->subState.numOfSub; - tscInfo("PROBE 0x%" PRIx64 " numOfSub=%d sql=%s", pSql->self, numOfSub, pSql->sqlstr == NULL ? "" : pSql->sqlstr); + tscInfo("PROBE 0x%" PRIx64 " check sql connection alive, numOfSub=%d sql=%s", pSql->self, numOfSub, pSql->sqlstr == NULL ? "" : pSql->sqlstr); if (numOfSub == 0) { // no sub sql if(!sendProbeConnMsg(pSql)) { @@ -460,9 +459,10 @@ int tscSendMsgToServer(SSqlObj *pSql) { return TSDB_CODE_FAILED; } + if(rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid)) { if(pSql->cmd.command != TSDB_SQL_HB) - rpcSaveSendInfo(pSql->rpcRid, &pSql->pPrevContext, &pSql->pPrevConn, &pSql->pPrevFdObj, &pSql->prevFd); + rpcSaveSendInfo(pSql->rpcRid, &pSql->pPrevContext); return TSDB_CODE_SUCCESS; } diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 3e1edc2c0a..940a1bf044 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -78,6 +78,13 @@ typedef struct SRpcInit { int (*afp)(char *tableId, char *spi, char *encrypt, char *secret, char *ckey); } SRpcInit; +typedef struct SSendInfo { + void *pContext; + void *pConn; + void *pFdObj; + int32_t fd; +} SSendInfo; + int32_t rpcInit(); void rpcCleanup(); void *rpcOpen(const SRpcInit *pRpc); @@ -94,9 +101,9 @@ int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcCancelRequest(int64_t rid); int32_t rpcUnusedSession(void * rpcInfo, bool bLock); // send rpc Refid connection probe alive message -bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, void* pPrevConn, void* pPrevFdObj, int32_t prevFd); +bool rpcSendProbe(int64_t rpcRid, void* pPrevContext); // after sql request send , save conn info -bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext, void** ppConn, void** ppFdObj, int32_t* pFd); +bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext); #ifdef __cplusplus } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index cc9464ad72..6f7e77d915 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -6369,7 +6369,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { // TEST TODU DELETE static int loop = 0; - taosMsleep(3*1000); + taosMsleep(1*1000); qInfo(" loop=%d pEnv=%p", loop++, pRuntimeEnv); diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 11a8b4b70b..e8564ac8ed 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -87,6 +87,7 @@ typedef struct { tsem_t *pSem; // for synchronous API SRpcEpSet *pSet; // for synchronous API char msg[0]; // RpcHead starts from here + SSendInfo sendInfo; // save last send information } SRpcReqContext; typedef struct SRpcConn { @@ -1412,6 +1413,11 @@ static bool rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { if(pContext) pConn->rid = pContext->rid; + // save + pContext->sendInfo.pConn = pConn; + pContext->sendInfo.pFdObj = pConn->chandle; + pContext->sendInfo.fd = taosGetFdID(pConn->chandle); + bool ret = rpcSendMsgToPeer(pConn, msg, msgLen); if (pConn->connType != RPC_CONN_TCPC) taosTmrReset(rpcProcessRetryTimer, tsRpcTimer, pConn, pRpc->tmrCtrl, &pConn->pTimer); @@ -1787,7 +1793,7 @@ bool doRpcSendProbe(SRpcConn *pConn) { } // send server syn -bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, void* pPrevConn, void* pPrevFdObj, int32_t prevFd) { +bool rpcSendProbe(int64_t rpcRid, void* pPrevContext) { // return false can kill query bool ret = false; if(rpcRid < 0) { @@ -1809,28 +1815,27 @@ bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, void* pPrevConn, void* pPr } // conn same - if (pContext->pConn != pPrevConn) { - tError("PROBE rpcRid=0x%" PRIx64 " connect obj diff. pContext->pConn=%p pPreConn=%p", rpcRid, pContext->pConn, pPrevConn); + if (pContext->pConn != pContext->sendInfo.pConn) { + tError("PROBE rpcRid=0x%" PRIx64 " connect obj diff. pContext->pConn=%p pPreConn=%p", rpcRid, pContext->pConn, pContext->sendInfo.pConn); ret = pContext->pConn == NULL; goto _END; } // fdObj same - if (pContext->pConn->chandle != pPrevFdObj) { - tError("PROBE rpcRid=0x%" PRIx64 " connect fdObj diff. pContext->pConn->chandle=%p pPrevFdObj=%p", rpcRid, pContext->pConn->chandle, pPrevFdObj); + if (pContext->pConn->chandle != pContext->sendInfo.pFdObj) { + tError("PROBE rpcRid=0x%" PRIx64 " connect fdObj diff. pContext->pConn->chandle=%p pPrevFdObj=%p", rpcRid, pContext->pConn->chandle, pContext->sendInfo.pFdObj); goto _END; } // fd same int32_t fd = taosGetFdID(pContext->pConn->chandle); - if (fd != prevFd) { - tError("PROBE rpcRid=0x%" PRIx64 " connect fd diff.fd=%d prevFd=%d", rpcRid, fd, prevFd); + if (fd != pContext->sendInfo.fd) { + tError("PROBE rpcRid=0x%" PRIx64 " connect fd diff.fd=%d prevFd=%d", rpcRid, fd, pContext->sendInfo.fd); goto _END; } // send syn ret = doRpcSendProbe(pContext->pConn); - tInfo("PROBE 0x%" PRIx64 " rrpcRid=0x%" PRIx64 " send data ret=%d fd=%d.", (int64_t)pContext->ahandle, rpcRid, ret, fd); _END: // put back req context @@ -1839,7 +1844,7 @@ _END: } // after sql request send , save conn info -bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext, void** ppConn, void** ppFdObj, int32_t* pFd) { +bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext) { if(rpcRid < 0) { tError("PROBE saveSendInfo rpcRid=0x%" PRIx64 " less than zero, invalid.", rpcRid); return false; @@ -1851,18 +1856,8 @@ bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext, void** ppConn, void** ppF return false; } - if (pContext->pConn == NULL || pContext->pConn->chandle == NULL) { - return false; - } - if (ppContext) *ppContext = pContext; - if (ppConn) - *ppConn = pContext->pConn; - if (ppFdObj && pContext->pConn) - *ppFdObj = pContext->pConn->chandle; - if (pFd && pContext->pConn) - *pFd = taosGetFdID(pContext->pConn->chandle); taosReleaseRef(tsRpcRefId, rpcRid); return true; -- GitLab