diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 10c42bb67d2961ac0530333fbb1e9fd4a2294062..dad6627c870806ce237f26c058793cb23a257a31 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -24,6 +24,8 @@ typedef struct { struct { int64_t clusterId; int32_t passKeyCnt; + int32_t passVer; + int32_t reqCnt; }; }; } SHbParam; @@ -536,14 +538,20 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { return TSDB_CODE_SUCCESS; } -static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { +static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SHbParam *param, SClientHbReq *req) { STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid); if (!pTscObj) { tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid); return TSDB_CODE_APP_ERROR; } - int32_t code = 0; + int32_t code = 0; + + if ((param->passVer != INT32_MIN) && (param->passVer <= pTscObj->passInfo.ver)) { + tscDebug("hb got user basic info, no need since passVer %d <= %d", param->passVer, pTscObj->passInfo.ver); + goto _return; + } + SUserPassVersion *user = taosMemoryMalloc(sizeof(SUserPassVersion)); if (!user) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -570,6 +578,9 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { goto _return; } + // assign the passVer + param->passVer = pTscObj->passInfo.ver; + _return: releaseTscObj(connKey->tscRid); if (code) { @@ -714,13 +725,16 @@ int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) { } int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { - SHbParam *hbParam = (SHbParam *)param; - struct SCatalog *pCatalog = NULL; - - int32_t code = catalogGetHandle(hbParam->clusterId, &pCatalog); - if (code != TSDB_CODE_SUCCESS) { - tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); - return code; + int32_t code = 0; + SHbParam *hbParam = (SHbParam *)param; + SCatalog *pCatalog = NULL; + + if (hbParam->reqCnt == 0) { + code = catalogGetHandle(hbParam->clusterId, &pCatalog); + if (code != TSDB_CODE_SUCCESS) { + tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); + return code; + } } hbGetAppInfo(hbParam->clusterId, req); @@ -728,24 +742,28 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req hbGetQueryBasicInfo(connKey, req); if (hbParam->passKeyCnt > 0) { - hbGetUserBasicInfo(connKey, req); + hbGetUserBasicInfo(connKey, hbParam, req); } - code = hbGetExpiredUserInfo(connKey, pCatalog, req); - if (TSDB_CODE_SUCCESS != code) { - return code; - } + if (hbParam->reqCnt == 0) { + code = hbGetExpiredUserInfo(connKey, pCatalog, req); + if (TSDB_CODE_SUCCESS != code) { + return code; + } - code = hbGetExpiredDBInfo(connKey, pCatalog, req); - if (TSDB_CODE_SUCCESS != code) { - return code; - } + code = hbGetExpiredDBInfo(connKey, pCatalog, req); + if (TSDB_CODE_SUCCESS != code) { + return code; + } - code = hbGetExpiredStbInfo(connKey, pCatalog, req); - if (TSDB_CODE_SUCCESS != code) { - return code; + code = hbGetExpiredStbInfo(connKey, pCatalog, req); + if (TSDB_CODE_SUCCESS != code) { + return code; + } } + ++hbParam->reqCnt; // success to get catalog info + return TSDB_CODE_SUCCESS; } @@ -766,6 +784,10 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { } int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt); pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq)); + if (!pBatchReq->reqs) { + tFreeClientHbBatchReq(pBatchReq); + return NULL; + } int64_t rid = -1; int32_t code = 0; @@ -782,12 +804,18 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { return NULL; } + SHbParam param = {0}; + while (pIter != NULL) { pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq); - SHbParam param; + switch (pOneReq->connKey.connType) { case CONN_TYPE__QUERY: { - param.clusterId = pOneReq->clusterId; + if (param.clusterId == 0) { + // init + param.clusterId = pOneReq->clusterId; + param.passVer = INT32_MIN; + } param.passKeyCnt = atomic_load_32(&pAppHbMgr->passKeyCnt); break; } @@ -801,9 +829,7 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { pOneReq->connKey.tscRid, pOneReq->connKey.connType); } } - break; -#if 0 if (code) { pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); pOneReq = pIter; @@ -812,7 +838,6 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); pOneReq = pIter; -#endif } releaseTscObj(rid); @@ -885,7 +910,6 @@ static void *hbThreadFunc(void *param) { hbGatherAppInfo(); } - SArray *mgr = taosArrayInit(sz, sizeof(void *)); for (int i = 0; i < sz; i++) { SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); if (pAppHbMgr == NULL) { @@ -894,7 +918,6 @@ static void *hbThreadFunc(void *param) { int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt); if (connCnt == 0) { - taosArrayPush(mgr, &pAppHbMgr); continue; } SClientHbBatchReq *pReq = hbGatherAllInfo(pAppHbMgr); @@ -908,7 +931,6 @@ static void *hbThreadFunc(void *param) { terrno = TSDB_CODE_OUT_OF_MEMORY; tFreeClientHbBatchReq(pReq); // hbClearReqInfo(pAppHbMgr); - taosArrayPush(mgr, &pAppHbMgr); break; } @@ -920,7 +942,6 @@ static void *hbThreadFunc(void *param) { tFreeClientHbBatchReq(pReq); // hbClearReqInfo(pAppHbMgr); taosMemoryFree(buf); - taosArrayPush(mgr, &pAppHbMgr); break; } pInfo->fp = hbAsyncCallBack; @@ -941,12 +962,8 @@ static void *hbThreadFunc(void *param) { // hbClearReqInfo(pAppHbMgr); atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); - taosArrayPush(mgr, &pAppHbMgr); } - taosArrayDestroy(clientHbMgr.appHbMgrs); - clientHbMgr.appHbMgrs = mgr; - taosThreadMutexUnlock(&clientHbMgr.lock); taosMsleep(HEARTBEAT_INTERVAL); @@ -1179,6 +1196,4 @@ void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) { } // set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner -void taos_set_hb_quit(int8_t quitByKill) { - clientHbMgr.quitByKill = quitByKill; -} +void taos_set_hb_quit(int8_t quitByKill) { clientHbMgr.quitByKill = quitByKill; }