diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index af91ac34f08658da67644d7f72209231b99eaeaf..2ca6ba669185d5bc8123d21e64805e65a8718945 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -210,7 +210,7 @@ void tscTagCondRelease(STagCond* pCond); void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo); void tscSetFreeHeatBeat(STscObj* pObj); -bool tscShouldFreeHeatBeat(SSqlObj* pHb); +bool tscShouldFreeHeartBeat(SSqlObj* pHb); bool tscShouldBeFreed(SSqlObj* pSql); STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 0613f3524271ed0474f16ff1437ea61aea35cab7..6b75b680b10858ecb6789aba6a0f92aa61c3a38a 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -171,46 +171,23 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { void tscProcessActivityTimer(void *handle, void *tmrId) { STscObj *pObj = (STscObj *)handle; + if (pObj == NULL || pObj->signature != pObj) { + return; + } - if (pObj == NULL) return; - if (pObj->signature != pObj) return; - if (pObj->pTimer != tmrId) return; - - if (pObj->pHb == NULL) { - SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); - if (NULL == pSql) return; - - pSql->fp = tscProcessHeartBeatRsp; - - SQueryInfo *pQueryInfo = NULL; - tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo); - pQueryInfo->command = TSDB_SQL_HB; - - pSql->cmd.command = TSDB_SQL_HB; - if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) { - taosTFree(pSql); - return; - } - - pSql->cmd.command = TSDB_SQL_HB; - pSql->param = pObj; - pSql->pTscObj = pObj; - pSql->signature = pSql; - pObj->pHb = pSql; - tscAddSubqueryInfo(&pObj->pHb->cmd); - - tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj); + SSqlObj* pHB = pObj->pHb; + if (pObj->pTimer != tmrId || pHB == NULL) { + return; } - if (tscShouldFreeHeatBeat(pObj->pHb)) { - tscDebug("%p free HB object and release connection", pObj->pHb); - tscFreeSqlObj(pObj->pHb); + if (tscShouldFreeHeartBeat(pHB)) { + tscDebug("%p free HB object and release connection", pHB); + tscFreeSqlObj(pHB); tscCloseTscObj(pObj); } else { -// taosMsleep(500); - int32_t code = tscProcessSql(pObj->pHb); + int32_t code = tscProcessSql(pHB); if (code != TSDB_CODE_SUCCESS) { - tscError("%p failed to sent HB to server, reason:%s", pObj->pHb, tstrerror(code)); + tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code)); } } } @@ -268,6 +245,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { return; } + pSql->pRpcCtx = NULL; // clear the rpcCtx + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) { tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p", @@ -1956,6 +1935,35 @@ int tscProcessShowRsp(SSqlObj *pSql) { return 0; } +static void createHBObj(STscObj* pObj) { + if (pObj->pHb != NULL) { + return; + } + + SSqlObj *pSql = (SSqlObj *)calloc(1, sizeof(SSqlObj)); + if (NULL == pSql) return; + + pSql->fp = tscProcessHeartBeatRsp; + + SQueryInfo *pQueryInfo = NULL; + tscGetQueryInfoDetailSafely(&pSql->cmd, 0, &pQueryInfo); + pQueryInfo->command = TSDB_SQL_HB; + + pSql->cmd.command = pQueryInfo->command; + if (TSDB_CODE_SUCCESS != tscAllocPayload(&(pSql->cmd), TSDB_DEFAULT_PAYLOAD_SIZE)) { + taosTFree(pSql); + return; + } + + pSql->param = pObj; + pSql->pTscObj = pObj; + pSql->signature = pSql; + pObj->pHb = pSql; + tscAddSubqueryInfo(&pObj->pHb->cmd); + + tscDebug("%p HB is allocated, pObj:%p", pObj->pHb, pObj); +} + int tscProcessConnectRsp(SSqlObj *pSql) { char temp[TSDB_TABLE_FNAME_LEN * 2]; STscObj *pObj = pSql->pTscObj; @@ -1977,6 +1985,9 @@ int tscProcessConnectRsp(SSqlObj *pSql) { pObj->writeAuth = pConnect->writeAuth; pObj->superAuth = pConnect->superAuth; pObj->connId = htonl(pConnect->connId); + + createHBObj(pObj); + taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer); return 0; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 94ab3a0c75acba62d9070c8549e17349fa3f0ac2..29c8aa0a561d30e2514a760a37298f088e7cbf80 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -216,10 +216,15 @@ void taos_close(TAOS *taos) { } if (pObj->pHb != NULL) { + if (pObj->pHb->pRpcCtx != NULL) { // wait for rsp from dnode + rpcCancelRequest(pObj->pHb->pRpcCtx); + } + tscSetFreeHeatBeat(pObj); - } else { - tscCloseTscObj(pObj); + tscFreeSqlObj(pObj->pHb); } + + tscCloseTscObj(pObj); } void waitForQueryRsp(void *param, TAOS_RES *tres, int code) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 60723676df295ce38cc4f6e60fab47ccaab5f1c0..1483f0de482c06cc4b98ecff0d11f11468dfe7ad 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1394,7 +1394,7 @@ void tscSetFreeHeatBeat(STscObj* pObj) { pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; } -bool tscShouldFreeHeatBeat(SSqlObj* pHb) { +bool tscShouldFreeHeartBeat(SSqlObj* pHb) { assert(pHb == pHb->signature); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pHb->cmd, 0);