From 07e2f9611516f92fc6b22baef67921dc55e93898 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 13 Aug 2020 23:29:13 +0800 Subject: [PATCH] [td-225] fix error in closing the conns. --- src/client/inc/tscUtil.h | 2 +- src/client/src/tscServer.c | 79 ++++++++++++++++++++++---------------- src/client/src/tscSql.c | 9 ++++- src/client/src/tscUtil.c | 2 +- 4 files changed, 54 insertions(+), 38 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index af91ac34f0..2ca6ba6691 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -210,7 +210,7 @@ void tscTagCondRelease(STagCond* pCond); void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo); void tscSetFreeHeatBeat(STscObj* pObj); -bool tscShouldFreeHeatBeat(SSqlObj* pHb); +bool tscShouldFreeHeartBeat(SSqlObj* pHb); bool tscShouldBeFreed(SSqlObj* pSql); STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 0613f35242..6b75b680b1 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -171,46 +171,23 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { void tscProcessActivityTimer(void *handle, void *tmrId) { STscObj *pObj = (STscObj *)handle; + if (pObj == NULL || pObj->signature != pObj) { + return; + } - if (pObj == NULL) return; - if (pObj->signature != pObj) return; - if (pObj->pTimer != tmrId) return; - - if (pObj->pHb == NULL) { - SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); - if (NULL == pSql) return; - - pSql->fp = tscProcessHeartBeatRsp; - - SQueryInfo *pQueryInfo = NULL; - tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo); - pQueryInfo->command = TSDB_SQL_HB; - - pSql->cmd.command = TSDB_SQL_HB; - if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) { - taosTFree(pSql); - return; - } - - pSql->cmd.command = TSDB_SQL_HB; - pSql->param = pObj; - pSql->pTscObj = pObj; - pSql->signature = pSql; - pObj->pHb = pSql; - tscAddSubqueryInfo(&pObj->pHb->cmd); - - tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj); + SSqlObj* pHB = pObj->pHb; + if (pObj->pTimer != tmrId || pHB == NULL) { + return; } - if (tscShouldFreeHeatBeat(pObj->pHb)) { - tscDebug("%p free HB object and release connection", pObj->pHb); - tscFreeSqlObj(pObj->pHb); + if (tscShouldFreeHeartBeat(pHB)) { + tscDebug("%p free HB object and release connection", pHB); + tscFreeSqlObj(pHB); tscCloseTscObj(pObj); } else { -// taosMsleep(500); - int32_t code = tscProcessSql(pObj->pHb); + int32_t code = tscProcessSql(pHB); if (code != TSDB_CODE_SUCCESS) { - tscError("%p failed to sent HB to server, reason:%s", pObj->pHb, tstrerror(code)); + tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code)); } } } @@ -268,6 +245,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { return; } + pSql->pRpcCtx = NULL; // clear the rpcCtx + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) { tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p", @@ -1956,6 +1935,35 @@ int tscProcessShowRsp(SSqlObj *pSql) { return 0; } +static void createHBObj(STscObj* pObj) { + if (pObj->pHb != NULL) { + return; + } + + SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); + if (NULL == pSql) return; + + pSql->fp = tscProcessHeartBeatRsp; + + SQueryInfo *pQueryInfo = NULL; + tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo); + pQueryInfo->command = TSDB_SQL_HB; + + pSql->cmd.command = pQueryInfo->command; + if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) { + taosTFree(pSql); + return; + } + + pSql->param = pObj; + pSql->pTscObj = pObj; + pSql->signature = pSql; + pObj->pHb = pSql; + tscAddSubqueryInfo(&pObj->pHb->cmd); + + tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj); +} + int tscProcessConnectRsp(SSqlObj *pSql) { char temp[TSDB_TABLE_FNAME_LEN * 2]; STscObj *pObj = pSql->pTscObj; @@ -1977,6 +1985,9 @@ int tscProcessConnectRsp(SSqlObj *pSql) { pObj->writeAuth = pConnect->writeAuth; pObj->superAuth = pConnect->superAuth; pObj->connId = htonl(pConnect->connId); + + createHBObj(pObj); + taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); return 0; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 94ab3a0c75..29c8aa0a56 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -216,10 +216,15 @@ void taos_close(TAOS *taos) { } if (pObj->pHb != NULL) { + if (pObj->pHb->pRpcCtx != NULL) { // wait for rsp from dnode + rpcCancelRequest(pObj->pHb->pRpcCtx); + } + tscSetFreeHeatBeat(pObj); - } else { - tscCloseTscObj(pObj); + tscFreeSqlObj(pObj->pHb); } + + tscCloseTscObj(pObj); } void waitForQueryRsp(void *param, TAOS_RES *tres, int code) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 60723676df..1483f0de48 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1394,7 +1394,7 @@ void tscSetFreeHeatBeat(STscObj* pObj) { pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; } -bool tscShouldFreeHeatBeat(SSqlObj* pHb) { +bool tscShouldFreeHeartBeat(SSqlObj* pHb) { assert(pHb == pHb->signature); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pHb->cmd, 0); -- GitLab