diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 2ada7b32e8ad74de1a3436139cc59481d674667c..f26739a3376470b40ffccae1f69f1064553af78e 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -397,6 +397,9 @@ typedef struct SSqlObj { int32_t retryReason; // previous error code struct SSqlObj *prev, *next; int64_t self; + // connect alive + int64_t lastUpdate; + char noAckCnt; // no recevie ack from sever count } SSqlObj; typedef struct SSqlStream { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index e42f73fb327b7a0c85741fd1edbcdc21845c5488..9e49c09550c75b41ae02284ea18c1258593e81bc 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -281,6 +281,52 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { } } +// pSql connection link is broken +bool dealConnBroken(SSqlObj * pSql) { + return true; +} + +// if return true, send probe connection msg to sever ok +bool sendProbeConnMsg(SSqlObj* pSql) { + pSql->noAckCnt++; + tscDebug("0x%"PRIx64" sendProbeConnMsg noAckCnt:%d", pSql->self, pSql->noAckCnt); + + // send + pSql->rpcRid + + + + + return true; +} + +// check have broken link queries than killed +void checkBrokenQueries(STscObj *pTscObj) { + // + SSqlObj *pSql = pTscObj->sqlList; + while (pSql) { + int32_t numOfSub = pSql->subState.numOfSub; + if (numOfSub == 0) { + // no sub sql + if(!sendProbeConnMsg(pSql)) { + // send failed , connect already broken + dealConnBroken(pSql); + } + + return ; + } + + // have sub sql + for (int i = 0; i < numOfSub; i++) { + SSqlObj *pSubSql = pSql->pSubs[i]; + if(!sendProbeConnMsg(pSubSql)) { + // send failed , connect already broken + dealConnBroken(pSubSql); + } + } + } +} + void tscProcessActivityTimer(void *handle, void *tmrId) { int64_t rid = (int64_t) handle; STscObj *pObj = taosAcquireRef(tscRefId, rid); @@ -296,6 +342,20 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { assert(pHB->self == pObj->hbrid); + // check queries already death + static int activetyTimerCnt = 0; + if (++activetyTimerCnt > 10) { // 1.5s * 10 = 15s interval call + activetyTimerCnt = 0; + + // call check if have query doing + if(pObj->sqlList && pObj->sqlList->next) { + // have queries executing + checkBrokenQueies(); + } + } + + + // send self connetion and queries pHB->retry = 0; int32_t code = tscBuildAndSendRequest(pHB, NULL); taosReleaseRef(tscObjRef, pObj->hbrid); @@ -417,6 +477,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; + + pSql->rpcRid = -1; if (pObj->signature != pObj) { tscDebug("0x%"PRIx64" DB connection is closed, cmd:%d pObj:%p signature:%p", pSql->self, pCmd->command, pObj, pObj->signature); @@ -427,6 +489,14 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { return; } + // check msgtype + if(rpcMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) { + pSql->noAckCnt = 0; + pSql->lastUpdate = taosGetTimestampMs(); + tscInfo(" recv sql probe msg. sql=%s", pSql->sqlstr); + return ; + } + SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) { tscDebug("0x%"PRIx64" sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p", diff --git a/src/dnode/inc/dnodeVRead.h b/src/dnode/inc/dnodeVRead.h index 9c88886f88bedaa63a5071b9dd2d773d4ff1cc0c..9171026f21a9ddf45ae716931c7cddb0856d3fb0 100644 --- a/src/dnode/inc/dnodeVRead.h +++ b/src/dnode/inc/dnodeVRead.h @@ -29,6 +29,8 @@ void * dnodeAllocVFetchQueue(void *pVnode); void dnodeFreeVQueryQueue(void *pQqueue); void dnodeFreeVFetchQueue(void *pFqueue); + + #ifdef __cplusplus } #endif diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 7676343b37d242c1d174a31959ea4be25a9d5af2..bea325c05342ed71668e5308907f89384d5ce42d 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -78,6 +78,8 @@ int32_t dnodeInitShell() { dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_PROBE_CONN] = dnodeDispatchToVReadQueue; + int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); if (numOfThreads < 1) { numOfThreads = 1; diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index c404ab1a55c3788f5756c99f7914764e6e9af295..8eab67209a883f6c462ff828f21a41c444ac9260 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -57,6 +57,13 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) { int32_t code = TSDB_CODE_VND_INVALID_VGROUP_ID; char * pCont = pMsg->pCont; + // check probe conn msg + if(pMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN) { + SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = 0, .msgType = TSDB_MSG_TYPE_PROBE_CONN_RSP}; + rpcSendResponse(&rpcRsp); + return ; + } + while (leftLen > 0) { SMsgHead *pHead = (SMsgHead *)pCont; pHead->vgId = htonl(pHead->vgId); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 1a8907efabc483233f7e12ce2511bfec339a8d6f..280747afed543eaddf2ed657c6067f4aad31a321 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -123,6 +123,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" ) // delete TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DELDATA, "delete-data" ) +// syn -> ack probe connection msg +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_PROBE_CONN, "probe-connection-alive" ) + #ifndef TAOS_MESSAGE_C TSDB_MSG_TYPE_MAX // 105 diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 5e3fbd571ef4b6425f2e5a58c308c8fc9da0b12e..c23959a0cc1e5c7d5c29c4e19701e983850a8327 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -94,6 +94,7 @@ int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcCancelRequest(int64_t rid); int32_t rpcUnusedSession(void * rpcInfo, bool bLock); + #ifdef __cplusplus } #endif diff --git a/src/rpc/inc/rpcTcp.h b/src/rpc/inc/rpcTcp.h index 6ef8fc2d921a3379532bbc0efd2f226ef3389fc5..20ebec4feca7f7f904fda0423aa79acb0e49e8ab 100644 --- a/src/rpc/inc/rpcTcp.h +++ b/src/rpc/inc/rpcTcp.h @@ -32,6 +32,8 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin void taosCloseTcpConnection(void *chandle); int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle); +int32_t taosGetFdID(void *chandle); + #ifdef __cplusplus } #endif diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 95931fcbc6f46bac1e535e4684750ab1874e8f0c..3203dc4a73fda318d0cfb88c3e296090042a8e85 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -196,7 +196,7 @@ static SRpcConn *rpcGetConnObj(SRpcInfo *pRpc, int sid, SRecvInfo *pRecv); static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext); static void rpcSendQuickRsp(SRpcConn *pConn, int32_t code); static void rpcSendErrorMsgToPeer(SRecvInfo *pRecv, int32_t code); -static void rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); +static bool rpcSendMsgToPeer(SRpcConn *pConn, void *data, int dataLen); static void rpcSendReqHead(SRpcConn *pConn); static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv); @@ -1349,9 +1349,10 @@ static void rpcSendReqToServer(SRpcInfo *pRpc, SRpcReqContext *pContext) { rpcUnlockConn(pConn); } -static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { +static bool rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { int writtenLen = 0; SRpcHead *pHead = (SRpcHead *)msg; + bool ret = true; msgLen = rpcAddAuthPart(pConn, msg, msgLen); @@ -1371,9 +1372,11 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { if (writtenLen != msgLen) { tError("%s, failed to send, msgLen:%d written:%d, reason:%s", pConn->info, msgLen, writtenLen, strerror(errno)); + ret = false; } tDump(msg, msgLen); + return ret; } static void rpcProcessConnError(void *param, void *id) { @@ -1683,4 +1686,81 @@ int32_t rpcUnusedSession(void * rpcInfo, bool bLock) { if(info == NULL) return 0; return taosIdPoolNumOfFree(info->idPool, bLock); +} + + +static void doRpcSendProbe(SRpcConn *pConn) { + char msg[RPC_MSG_OVERHEAD]; + SRpcHead *pHead; + + // set msg header + memset(msg, 0, sizeof(SRpcHead)); + pHead = (SRpcHead *)msg; + pHead->version = 1; + pHead->msgType = TSDB_MSG_TYPE_PROBE_CONN; + pHead->spi = pConn->spi; + pHead->encrypt = 0; + pHead->tranId = pConn->inTranId; + pHead->sourceId = pConn->ownId; + pHead->destId = pConn->peerId; + pHead->linkUid = pConn->linkUid; + pHead->ahandle = (uint64_t)pConn->ahandle; + memcpy(pHead->user, pConn->user, tListLen(pHead->user)); + pHead->code = htonl(code); + + rpcSendMsgToPeer(pConn, msg, sizeof(SRpcHead)); + pConn->secured = 1; // connection shall be secured +} + +// send server syn +bool rpcSendProbe(int64_t rpcRid, void* pPrevContext, void* pPrevConn, void* pPrevFdObj, int32_t prevFd) { + bool ret = false; + if(rpcRid < 0) { + tError("ACK rpcRid=%" PRId64 " less than zero, invalid.", rpcRid); + return false; + } + + // get req content + SRpcReqContext *pContext = taosAcquireRef(tsRpcRefId, rpcRid); + if (pContext == NULL) { + tError("ACK rpcRid=%" PRId64 " get context NULL.", rpcRid); + return false; + } + + // context same + if(pContext != pPrevContext) { + tError("ACK rpcRid=%" PRId64 " context diff. pContext=%p pPreContent=%p", rpcRid, pContext, pPrevContext); + goto _END; + } + + // conn same + if (pContext->pConn != pPrevConn) { + tError("ACK rpcRid=%" PRId64 " connect obj diff. pContext->pConn=%p pPreConn=%p", rpcRid, pContext->pConn, pPrevConn); + goto _END; + } + + // fdObj same + if (pContext->pConn->chandle != pPrevFdObj) { + tError("ACK rpcRid=%" PRId64 " connect fdObj diff. pContext->pConn->chandle=%p pPrevFdObj=%p", rpcRid, pContext->pConn->chandle, pPrevFdObj); + goto _END; + } + + // fd same + int32_t fd = taosGetFdID(pContext->pConn->chandle); + if (fd != prevFd) { + tError("ACK rpcRid=%" PRId64 " connect fd diff.fd=%d prevFd=%p", rpcRid, fd, prevFd); + goto _END; + } + + // send syn + ret = doRpcSendProbe(pContext->pConn); + +_END: + // put back req context + taosReleaseRef(tsRpcRefId, rpcRid); + return ret; +} + +bool saveSendInfo(int64_t rpcRid, void** ppContext, void** ppConn, void** ppFdObj, int32_t* pFd) { + } \ No newline at end of file diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 740a1e2b7d2784347b19be328319fc19f417f25d..001c50ee5dedae5cac9f3b87a8055b97b5af7984 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -674,3 +674,12 @@ static void taosFreeFdObj(SFdObj *pFdObj) { tfree(pFdObj); } + +int32_t taosGetFdID(void *chandle) { + SFdObj * pFdObj = chandle; + if(pFdObj == NULL) + return -1; + if (pFdObj->signature != pFdObj) + return -1; + return pFdObj->fd; +} \ No newline at end of file