From 6ecc4abc8e3ddbe74e2125318e3174e4bcbe10c1 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Thu, 14 Jul 2022 17:29:29 +0800 Subject: [PATCH] fix(rpc): probe alive test --- src/client/src/tscServer.c | 5 +++-- src/dnode/inc/dnodeVRead.h | 2 ++ src/dnode/src/dnodeShell.c | 2 +- src/dnode/src/dnodeVRead.c | 16 +++++++++++++++- src/inc/trpc.h | 2 +- src/rpc/src/rpcMain.c | 3 ++- src/vnode/src/vnodeRead.c | 3 ++- 7 files changed, 26 insertions(+), 7 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 404b074f66..de57f62fbe 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -291,7 +291,7 @@ bool dealConnBroken(SSqlObj * pSql) { // if return true, send probe connection msg to sever ok bool sendProbeConnMsg(SSqlObj* pSql) { // check time out - int32_t probeTimeout = 60*1000; // over this value send probe msg + int32_t probeTimeout = 1*1000; // over this value send probe msg int32_t killTimeout = 3*60*1000; // over this value query can be killed if(pSql->stime == 0) { // not start , no need probe @@ -430,7 +430,8 @@ int tscSendMsgToServer(SSqlObj *pSql) { } if(rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid)) { - saveSendInfo(pSql->rpcRid, &pSql->pPrevContext, &pSql->pPrevConn, &pSql->pPrevFdObj, &pSql->prevFd); + if(pSql->cmd.command != TSDB_SQL_HB) + rpcSaveSendInfo(pSql->rpcRid, &pSql->pPrevContext, &pSql->pPrevConn, &pSql->pPrevFdObj, &pSql->prevFd); return TSDB_CODE_SUCCESS; } diff --git a/src/dnode/inc/dnodeVRead.h b/src/dnode/inc/dnodeVRead.h index 9171026f21..b467e93885 100644 --- a/src/dnode/inc/dnodeVRead.h +++ b/src/dnode/inc/dnodeVRead.h @@ -28,6 +28,8 @@ void * dnodeAllocVQueryQueue(void *pVnode); void * dnodeAllocVFetchQueue(void *pVnode); void dnodeFreeVQueryQueue(void *pQqueue); void dnodeFreeVFetchQueue(void *pFqueue); +// reponse probe connection msg +void dnodeResponseProbeMsg(SRpcMsg *pMsg); diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index bea325c053..0f536a84cc 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -78,7 +78,7 @@ int32_t dnodeInitShell() { dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_PROBE_CONN] = dnodeDispatchToVReadQueue; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_PROBE_CONN] = dnodeResponseProbeMsg; int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); if (numOfThreads < 1) { diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index 8eab67209a..544961adfc 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -143,7 +143,11 @@ static void *dnodeProcessReadQueue(void *wparam) { int32_t code = vnodeProcessRead(pVnode, pRead); - if (qtype == TAOS_QTYPE_RPC && code != TSDB_CODE_QRY_NOT_READY) { + if(code == 9999) { + dInfo(" ******* doNotRsp Test **** msg:%p, app:%p type:%s will be processed in vquery queue, qtype:%d", pRead, pRead->rpcAhandle,taosMsg[pRead->msgType], qtype); + printf(" ******* doNotRsp Test **** msg:%p, app:%p type:%s will be processed in vquery queue, qtype:%d", pRead, pRead->rpcAhandle,taosMsg[pRead->msgType], qtype); + + } else if (qtype == TAOS_QTYPE_RPC && code != TSDB_CODE_QRY_NOT_READY) { dnodeSendRpcVReadRsp(pVnode, pRead, code); } else { if (code == TSDB_CODE_QRY_HAS_RSP) { @@ -159,3 +163,13 @@ static void *dnodeProcessReadQueue(void *wparam) { return NULL; } + +// reponse probe connection msg +void dnodeResponseProbeMsg(SRpcMsg *pMsg) { + // check probe conn msg + if(pMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN) { + SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = 0, .msgType = TSDB_MSG_TYPE_PROBE_CONN_RSP}; + rpcSendResponse(&rpcRsp); + return ; + } +} diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 95b84d66eb..3e1edc2c0a 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -96,7 +96,7 @@ int32_t rpcUnusedSession(void * rpcInfo, bool bLock); // send rpc Refid connection probe alive message bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, void* pPrevConn, void* pPrevFdObj, int32_t prevFd); // after sql request send , save conn info -bool saveSendInfo(int64_t rpcRid, void** ppContext, void** ppConn, void** ppFdObj, int32_t* pFd); +bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext, void** ppConn, void** ppFdObj, int32_t* pFd); #ifdef __cplusplus } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index a950bea15c..a8cd533e36 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -1698,6 +1698,7 @@ bool doRpcSendProbe(SRpcConn *pConn) { memset(msg, 0, sizeof(SRpcHead)); pHead = (SRpcHead *)msg; pHead->version = 1; + pHead->msgVer = htonl(tsVersion >> 8); pHead->msgType = TSDB_MSG_TYPE_PROBE_CONN; pHead->spi = pConn->spi; pHead->encrypt = 0; @@ -1765,7 +1766,7 @@ _END: } // after sql request send , save conn info -bool saveSendInfo(int64_t rpcRid, void** ppContext, void** ppConn, void** ppFdObj, int32_t* pFd) { +bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext, void** ppConn, void** ppFdObj, int32_t* pFd) { if(rpcRid < 0) { tError("ACK saveSendInfo rpcRid=%" PRId64 " less than zero, invalid.", rpcRid); return false; diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index e8495cac6d..8f3c7a399a 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -339,8 +339,9 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle); } - } + // TEST CODE + //code = 9999; } return code; -- GitLab