From 962572266335acca4758f8d1eef7e18e95b2b6e7 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Mon, 25 Jul 2022 20:27:45 +0800 Subject: [PATCH] fix(rpc): add global config and broken link method --- src/client/src/tscServer.c | 32 +++++++++++++++++++++++++------- src/common/inc/tglobal.h | 4 ++++ src/common/src/tglobal.c | 27 +++++++++++++++++++++++++++ src/dnode/inc/dnodeVRead.h | 2 -- src/dnode/src/dnodeShell.c | 2 -- src/dnode/src/dnodeVRead.c | 12 +----------- src/util/inc/tconfig.h | 2 +- 7 files changed, 58 insertions(+), 23 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index b626a5355e..2bd2f9bd4c 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -283,16 +283,34 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { // pSql connection link is broken bool dealConnBroken(SSqlObj * pSql) { - // TODO + // check valid + if (pSql == NULL || pSql->signature != pSql) { + return false; + } + if (pSql->cmd.command >= TSDB_SQL_LOCAL) { + return false; + } + + // cancel + if (pSql->rpcRid > 0) { + tscDebug("PROBE 0x%" PRIx64 " rpc cancel request rpcRid=0x%" PRIx64 ".", pSql->self, pSql->rpcRid); + rpcCancelRequest(pSql->rpcRid); + pSql->rpcRid = -1; + } + + // error + tscDebug("PROBE 0x%"PRIx64" async result error.", pSql->self); + tscAsyncResultOnError(pSql); return true; } // if return true, send probe connection msg to sever ok bool sendProbeConnMsg(SSqlObj* pSql) { - // check time out - int32_t probeTimeout = 1*1000; // over this value send probe msg - int32_t killTimeout = 3*60*1000; // over this value query can be killed + // TEST TODO DELETE + tsProbeSeconds = 1; // over this value send probe msg + tsProbeKillSeconds = 3*60; // over this value query can be killed + if(pSql->stime == 0) { // not start , no need probe return true; @@ -300,12 +318,12 @@ bool sendProbeConnMsg(SSqlObj* pSql) { int64_t stime = MAX(pSql->stime, pSql->lastAlive); int32_t diff = (int32_t)(taosGetTimestampMs() - stime); - if (diff < probeTimeout) { + if (diff < tsProbeSeconds * 1000) { // exec time short , need not probe alive return true; } - if (diff > killTimeout) { + if (diff > tsProbeKillSeconds * 1000) { // need kill query tscDebug("PROBE 0x%"PRIx64" need killed, noAckCnt:%d diff=%d", pSql->self, pSql->noAckCnt, diff); //return false; @@ -529,7 +547,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { if(rpcMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN_RSP) { pSql->noAckCnt = 0; pSql->lastAlive = taosGetTimestampMs(); - tscInfo(" recv sql probe msg. sql=%s", pSql->sqlstr); + tscDebug(" PROBE %p recv probe msg. sql=%s", pSql->self, pSql->sqlstr); return ; } diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index 3add0b566d..7011ac06ec 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -218,6 +218,10 @@ extern int32_t debugFlag; extern int8_t tsClientMerge; +// probe alive connection +extern int32_t tsProbeSeconds; +extern int32_t tsProbeKillSeconds; + #ifdef TD_TSZ // lossy extern char lossyColumns[]; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index 44a53efd8c..dedc81fdb4 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -268,6 +268,11 @@ int32_t fsDebugFlag = 135; int8_t tsClientMerge = 0; +// probe alive connection +int32_t tsProbeSeconds = 10 * 60; // start probe link alive after tsProbeSeconds from starting query +int32_t tsProbeKillSeconds = 30 * 60; // start kill query after tsProbeKillSeconds from starting query + + #ifdef TD_TSZ // // lossy compress 6 @@ -1733,6 +1738,28 @@ static void doInitGlobalConfig(void) { cfg.unitType = TAOS_CFG_UTYPE_NONE; taosInitConfigOption(cfg); + // probeSeconds + cfg.option = "probeSeconds"; + cfg.ptr = &tsProbeSeconds; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT; + cfg.minValue = 0; + cfg.maxValue = 0; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + + // probeKillSeconds + cfg.option = "probeKillSeconds"; + cfg.ptr = &tsProbeKillSeconds; + cfg.valType = TAOS_CFG_VTYPE_INT32; + cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLIENT; + cfg.minValue = 0; + cfg.maxValue = 0; + cfg.ptrLength = 0; + cfg.unitType = TAOS_CFG_UTYPE_NONE; + taosInitConfigOption(cfg); + #ifdef TD_TSZ // lossy compress cfg.option = "lossyColumns"; diff --git a/src/dnode/inc/dnodeVRead.h b/src/dnode/inc/dnodeVRead.h index b467e93885..9171026f21 100644 --- a/src/dnode/inc/dnodeVRead.h +++ b/src/dnode/inc/dnodeVRead.h @@ -28,8 +28,6 @@ void * dnodeAllocVQueryQueue(void *pVnode); void * dnodeAllocVFetchQueue(void *pVnode); void dnodeFreeVQueryQueue(void *pQqueue); void dnodeFreeVFetchQueue(void *pFqueue); -// reponse probe connection msg -void dnodeResponseProbeMsg(SRpcMsg *pMsg); diff --git a/src/dnode/src/dnodeShell.c b/src/dnode/src/dnodeShell.c index 2bf67f9a5c..26a2fc9651 100644 --- a/src/dnode/src/dnodeShell.c +++ b/src/dnode/src/dnodeShell.c @@ -78,8 +78,6 @@ int32_t dnodeInitShell() { dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep; - dnodeProcessShellMsgFp[TSDB_MSG_TYPE_PROBE_CONN] = dnodeResponseProbeMsg; - int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); if (numOfThreads < 1) { numOfThreads = 1; diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index 8a9cca4f5f..6b2a0ab8e0 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -155,14 +155,4 @@ static void *dnodeProcessReadQueue(void *wparam) { } return NULL; -} - -// reponse probe connection msg -void dnodeResponseProbeMsg(SRpcMsg *pMsg) { - // check probe conn msg - if(pMsg->msgType == TSDB_MSG_TYPE_PROBE_CONN) { - SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = 0, .msgType = TSDB_MSG_TYPE_PROBE_CONN_RSP}; - rpcSendResponse(&rpcRsp); - return ; - } -} +} \ No newline at end of file diff --git a/src/util/inc/tconfig.h b/src/util/inc/tconfig.h index fd9a340a25..c0a4986936 100644 --- a/src/util/inc/tconfig.h +++ b/src/util/inc/tconfig.h @@ -20,7 +20,7 @@ extern "C" { #endif -#define TSDB_CFG_MAX_NUM 131 +#define TSDB_CFG_MAX_NUM 133 #define TSDB_CFG_PRINT_LEN 23 #define TSDB_CFG_OPTION_LEN 24 #define TSDB_CFG_VALUE_LEN 41 -- GitLab