diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 85bdfa43fdf164ac2a3a51b5d8bba047821267c0..39a6869ee4a72d3dc5e0b356d66734c5a9287aca 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -396,6 +396,10 @@ typedef struct SSqlObj { int32_t retryReason; // previous error code struct SSqlObj *prev, *next; int64_t self; + + // connect alive + int64_t lastAlive; + void * pPrevContext; } SSqlObj; typedef struct SSqlStream { diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 911da05b4379ad871a09db6f7b87d26712051e02..04ad4a1cb62e763899b40f2800dc540db76ca136 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -312,7 +312,11 @@ static void tscAsyncResultCallback(SSchedMsg *pMsg) { return; } - assert(pSql->res.code != TSDB_CODE_SUCCESS); + // probe send error , but result be responsed by server async + if(pSql->res.code == TSDB_CODE_SUCCESS) { + return ; + } + tscError("0x%"PRIx64" async result callback, code:%s", pSql->self, tstrerror(pSql->res.code)); SSqlRes *pRes = &pSql->res; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 317b179ea43c933d9a4d2e7f73a91d6ea1a51c87..227f89b6058698038396690ab363a7a19a24685b 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -281,6 +281,102 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { } } +// if return true, send probe connection msg to sever ok +bool sendProbeConnMsg(SSqlObj* pSql, int64_t stime) { + if(stime == 0) { + // not start , no need probe + tscInfo("PROBE 0x%" PRIx64 " not start, no need probe.", pSql->self); + return true; + } + + int64_t start = MAX(stime, pSql->lastAlive); + int32_t diff = (int32_t)(taosGetTimestampMs() - start); + if (diff < tsProbeSeconds * 1000) { + // exec time short , need not probe alive + tscInfo("PROBE 0x%" PRIx64 " not arrived probe time. cfg timeout=%ds, no need probe. lastAlive=%" PRId64 " stime=%" PRId64, \ + pSql->self, tsProbeSeconds, pSql->lastAlive, pSql->stime); + return true; + } + + if (diff > tsProbeKillSeconds * 1000) { + // need kill query + tscInfo("PROBE 0x%" PRIx64 " kill query by probe. because arrived kill time. time=%ds cfg timeout=%ds lastAlive=%" PRId64 " stime=%" PRId64, \ + pSql->self, diff/1000, tsProbeKillSeconds, pSql->lastAlive, pSql->stime); + + return false; + } + + if (pSql->pPrevContext == NULL) { + // last connect info save uncompletely, so can't probe + tscInfo("PROBE 0x%" PRIx64 " save last connect info uncompletely. prev context is null", pSql->self); + return true; + } + + if(pSql->rpcRid == -1) { + // cancel or reponse ok from server, so need not probe + tscInfo("PROBE 0x%" PRIx64 " rpcRid is -1, response ok. no need probe.", pSql->self); + return true; + } + + bool ret = rpcSendProbe(pSql->rpcRid, pSql->pPrevContext); + tscInfo("PROBE 0x%" PRIx64 " send probe msg, ret=%d rpcRid=0x%" PRIx64, pSql->self, ret, pSql->rpcRid); + return ret; +} + +// check have broken link queries than killed +void checkBrokenQueries(STscObj *pTscObj) { + tscDebug("PROBE checkBrokenQueries pTscObj=%p pTscObj->rid=0x%" PRIx64, pTscObj, pTscObj->rid); + SSqlObj *pSql = pTscObj->sqlList; + while (pSql) { + // avoid sqlobj may not be correctly removed from sql list + if (pSql->sqlstr == NULL) { + pSql = pSql->next; + continue; + } + + bool kill = false; + int32_t numOfSub = pSql->subState.numOfSub; + tscInfo("PROBE 0x%" PRIx64 " start checking sql alive, numOfSub=%d sql=%s stime=%" PRId64 " alive=%" PRId64 " rpcRid=0x%" PRIx64 \ + ,pSql->self, numOfSub, pSql->sqlstr == NULL ? "" : pSql->sqlstr, pSql->stime, pSql->lastAlive, pSql->rpcRid); + if (numOfSub == 0) { + // no sub sql + if(!sendProbeConnMsg(pSql, pSql->stime)) { + // need kill + tscInfo("PROBE 0x%" PRIx64 " need break link done. rpcRid=0x%" PRIx64, pSql->self, pSql->rpcRid); + kill = true; + } + } else { + // lock subs + pthread_mutex_lock(&pSql->subState.mutex); + if (pSql->pSubs) { + // have sub sql + for (int i = 0; i < numOfSub; i++) { + SSqlObj *pSubSql = pSql->pSubs[i]; + if(pSubSql) { + tscInfo("PROBE 0x%" PRIx64 " sub sql app is 0x%" PRIx64, pSql->self, pSubSql->self); + if(!sendProbeConnMsg(pSubSql, pSql->stime)) { + // need kill + tscInfo("PROBE 0x%" PRIx64 " i=%d sub app=0x%" PRIx64 " need break link done. rpcRid=0x%" PRIx64, pSql->self, i, pSubSql->self, pSubSql->rpcRid); + kill = true; + break; + } + } + } + } + // unlock + pthread_mutex_unlock(&pSql->subState.mutex); + } + + // kill query + if(kill) { + taos_stop_query(pSql); + } + + // move next + pSql = pSql->next; + } +} + void tscProcessActivityTimer(void *handle, void *tmrId) { int64_t rid = (int64_t) handle; STscObj *pObj = taosAcquireRef(tscRefId, rid); @@ -296,6 +392,18 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { assert(pHB->self == pObj->hbrid); + // check queries already death + static int activetyCnt = 0; + if (++activetyCnt > tsProbeInterval) { // 1.5s * 40 = 60s interval call check queries alive + activetyCnt = 0; + + // call check if have query doing + if(pObj->sqlList) { + // have queries executing + checkBrokenQueries(pObj); + } + } + pHB->retry = 0; int32_t code = tscBuildAndSendRequest(pHB, NULL); taosReleaseRef(tscObjRef, pObj->hbrid); diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index f462cc1037f972328da01969aa4b420ad225407e..589b0803d0ef7cf8720713243891d1dad46bbcc0 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -227,6 +227,11 @@ extern int32_t debugFlag; extern int8_t tsClientMerge; +// probe alive connection +extern int32_t tsProbeSeconds; +extern int32_t tsProbeKillSeconds; +extern int32_t tsProbeInterval; + #ifdef TD_TSZ // lossy extern char lossyColumns[]; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 81224d8d917b961848dd2fedd4a907cb15de20a1..c8f73053a43ee14265f57406ec99f1e141109577 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -277,6 +277,11 @@ int32_t fsDebugFlag = 135; int8_t tsClientMerge = 0; +// probe alive connection +int32_t tsProbeSeconds = 5 * 60; // start probe link alive after tsProbeSeconds from starting query +int32_t tsProbeKillSeconds = 10 * 60; // start kill query after tsProbeKillSeconds from last alive time +int32_t tsProbeInterval = 40; // 40 * 1.5s = 60 s interval time + #ifdef TD_TSZ // // lossy compress 6 @@ -1782,6 +1787,39 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_MB; taosInitConfigOption(cfg); + // probeSeconds + cfg.option = "probeSeconds"; + cfg.ptr = &tsProbeSeconds; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT; + cfg.minValue = 0; + cfg.maxValue = 100000; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + + // probeKillSeconds + cfg.option = "probeKillSeconds"; + cfg.ptr = &tsProbeKillSeconds; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT; + cfg.minValue = 0; + cfg.maxValue = 100000; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + + // probeInterval + cfg.option = "probeInterval"; + cfg.ptr = &tsProbeInterval; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT; + cfg.minValue = 0; + cfg.maxValue = 100000; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + #ifdef TD_TSZ // lossy compress cfg.option = "lossyColumns"; diff --git a/src/dnode/inc/dnodeVRead.h b/src/dnode/inc/dnodeVRead.h index 9c88886f88bedaa63a5071b9dd2d773d4ff1cc0c..e816a897e46474eb7c0193568e93db007733ff73 100644 --- a/src/dnode/inc/dnodeVRead.h +++ b/src/dnode/inc/dnodeVRead.h @@ -29,6 +29,9 @@ void * dnodeAllocVFetchQueue(void *pVnode); void dnodeFreeVQueryQueue(void *pQqueue); void dnodeFreeVFetchQueue(void *pFqueue); +// reponse probe connection msg +void dnodeResponseProbeMsg(SRpcMsg *pMsg); + #ifdef __cplusplus } #endif diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 7676343b37d242c1d174a31959ea4be25a9d5af2..ec7f589e7ddb1e8bbb80f3abb5d1b93332bcd6cd 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -77,6 +77,7 @@ int32_t dnodeInitShell() { dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep; + dnodeProcessShellMsgFp[TSDB_MSG_TYPE_PROBE_CONN] = dnodeResponseProbeMsg; int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); if (numOfThreads < 1) { diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index c404ab1a55c3788f5756c99f7914764e6e9af295..0b0bf29e504e1779ebe7921d8d374b1b31729b7a 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -152,3 +152,13 @@ static void *dnodeProcessReadQueue(void *wparam) { return NULL; } + +// reponse probe connection msg +void dnodeResponseProbeMsg(SRpcMsg *pMsg) { + // check probe conn msg + if(pMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN) { + SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = 0, .msgType = TSDB_MSG_TYPE_PROBE_CONN_RSP}; + rpcSendResponse(&rpcRsp); + return ; + } +} \ No newline at end of file diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 26ce551e397fccfe6eb378aa0de2de771dfae10f..f901025ca6f3eeb7a29135c3a0f43054184c0748 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -120,6 +120,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" ) +// syn -> ack probe connection msg +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_PROBE_CONN, "probe-connection-alive" ) + #ifndef TAOS_MESSAGE_C TSDB_MSG_TYPE_MAX // 105 #endif diff --git a/src/inc/trpc.h b/src/inc/trpc.h index 0ce2e3da14d1cec204fc755db13da53f08295bff..87bd1e781f21f2d1d8741bedf8526f384b25a5a0 100644 --- a/src/inc/trpc.h +++ b/src/inc/trpc.h @@ -85,13 +85,17 @@ void rpcClose(void *); void *rpcMallocCont(int contLen); void rpcFreeCont(void *pCont); void *rpcReallocCont(void *ptr, int contLen); -void rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); +bool rpcSendRequest(void *thandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid); void rpcSendResponse(const SRpcMsg *pMsg); void rpcSendRedirectRsp(void *pConn, const SRpcEpSet *pEpSet); int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); void rpcSendRecv(void *shandle, SRpcEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcCancelRequest(int64_t rid); +// send rpc Refid connection probe alive message +bool rpcSendProbe(int64_t rpcRid, void* pPrevContext); +// after sql request send , save conn info +bool rpcSaveSendInfo(int64_t rpcRid, void** ppContext); #ifdef __cplusplus } diff --git a/src/rpc/inc/rpcTcp.h b/src/rpc/inc/rpcTcp.h index 6ef8fc2d921a3379532bbc0efd2f226ef3389fc5..a47fa39ceb3b3a642c7839902885cbc8728abe16 100644 --- a/src/rpc/inc/rpcTcp.h +++ b/src/rpc/inc/rpcTcp.h @@ -32,6 +32,8 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin void taosCloseTcpConnection(void *chandle); int taosSendTcpData(uint32_t ip, uint16_t port, void *data, int len, void *chandle); +SOCKET taosGetFdID(void *chandle); + #ifdef __cplusplus } #endif diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index 5cfa1674ca9c3168ab2dc25ee09c57b53a787b1d..402b4cb4a628fb632f502ae3b11e1248e4aab7de 100644 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -69,6 +69,13 @@ typedef struct { struct SRpcConn *connList; // connection list } SRpcInfo; +typedef struct SSendInfo { + void *pContext; + void *pConn; + void *pFdObj; + SOCKET fd; +} SSendInfo; + typedef struct { SRpcInfo *pRpc; // associated SRpcInfo SRpcEpSet epSet; // ip list provided by app diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 740a1e2b7d2784347b19be328319fc19f417f25d..ee8c2c2f024b62091fa5ce290e488934f2693276 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -674,3 +674,12 @@ static void taosFreeFdObj(SFdObj *pFdObj) { tfree(pFdObj); } + +SOCKET taosGetFdID(void *chandle) { + SFdObj * pFdObj = chandle; + if(pFdObj == NULL) + return -1; + if (pFdObj->signature != pFdObj) + return -1; + return pFdObj->fd; +} \ No newline at end of file diff --git a/src/util/inc/tconfig.h b/src/util/inc/tconfig.h index acff4168cfbbd2c1b8578fe0075ac173bfd43beb..f207b9b5c4dd270cc4364e548449edcbee43ae4a 100644 --- a/src/util/inc/tconfig.h +++ b/src/util/inc/tconfig.h @@ -20,7 +20,7 @@ extern "C" { #endif -#define TSDB_CFG_MAX_NUM 135 +#define TSDB_CFG_MAX_NUM 138 #define TSDB_CFG_PRINT_LEN 23 #define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_VALUE_LEN 41