未验证 提交 ad7509b1 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #21315 from taosdata/enh/TD-23421-M

fix: client hb logic fix and optimization
......@@ -24,6 +24,8 @@ typedef struct {
struct {
int64_t clusterId;
int32_t passKeyCnt;
int32_t passVer;
int32_t reqCnt;
};
};
} SHbParam;
......@@ -536,14 +538,20 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
return TSDB_CODE_SUCCESS;
}
static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SHbParam *param, SClientHbReq *req) {
STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
if (!pTscObj) {
tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
return TSDB_CODE_APP_ERROR;
}
int32_t code = 0;
int32_t code = 0;
if (param && (param->passVer != INT32_MIN) && (param->passVer <= pTscObj->passInfo.ver)) {
tscDebug("hb got user basic info, no need since passVer %d <= %d", param->passVer, pTscObj->passInfo.ver);
goto _return;
}
SUserPassVersion *user = taosMemoryMalloc(sizeof(SUserPassVersion));
if (!user) {
code = TSDB_CODE_OUT_OF_MEMORY;
......@@ -570,6 +578,11 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
goto _return;
}
// assign the passVer
if (param) {
param->passVer = pTscObj->passInfo.ver;
}
_return:
releaseTscObj(connKey->tscRid);
if (code) {
......@@ -714,13 +727,16 @@ int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) {
}
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {
SHbParam *hbParam = (SHbParam *)param;
struct SCatalog *pCatalog = NULL;
int32_t code = catalogGetHandle(hbParam->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
return code;
int32_t code = 0;
SHbParam *hbParam = (SHbParam *)param;
SCatalog *pCatalog = NULL;
if (hbParam->reqCnt == 0) {
code = catalogGetHandle(hbParam->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
return code;
}
}
hbGetAppInfo(hbParam->clusterId, req);
......@@ -728,24 +744,28 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
hbGetQueryBasicInfo(connKey, req);
if (hbParam->passKeyCnt > 0) {
hbGetUserBasicInfo(connKey, req);
hbGetUserBasicInfo(connKey, hbParam, req);
}
code = hbGetExpiredUserInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
if (hbParam->reqCnt == 0) {
code = hbGetExpiredUserInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
code = hbGetExpiredDBInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
code = hbGetExpiredDBInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
code = hbGetExpiredStbInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) {
return code;
code = hbGetExpiredStbInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
}
++hbParam->reqCnt; // success to get catalog info
return TSDB_CODE_SUCCESS;
}
......@@ -766,6 +786,10 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
}
int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
if (!pBatchReq->reqs) {
tFreeClientHbBatchReq(pBatchReq);
return NULL;
}
int64_t rid = -1;
int32_t code = 0;
......@@ -782,12 +806,18 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
return NULL;
}
SHbParam param = {0};
while (pIter != NULL) {
pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq);
SHbParam param;
switch (pOneReq->connKey.connType) {
case CONN_TYPE__QUERY: {
param.clusterId = pOneReq->clusterId;
if (param.clusterId == 0) {
// init
param.clusterId = pOneReq->clusterId;
param.passVer = INT32_MIN;
}
param.passKeyCnt = atomic_load_32(&pAppHbMgr->passKeyCnt);
break;
}
......@@ -801,9 +831,7 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
pOneReq->connKey.tscRid, pOneReq->connKey.connType);
}
}
break;
#if 0
if (code) {
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
pOneReq = pIter;
......@@ -812,7 +840,6 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
pOneReq = pIter;
#endif
}
releaseTscObj(rid);
......@@ -885,7 +912,6 @@ static void *hbThreadFunc(void *param) {
hbGatherAppInfo();
}
SArray *mgr = taosArrayInit(sz, sizeof(void *));
for (int i = 0; i < sz; i++) {
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
if (pAppHbMgr == NULL) {
......@@ -894,7 +920,6 @@ static void *hbThreadFunc(void *param) {
int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
if (connCnt == 0) {
taosArrayPush(mgr, &pAppHbMgr);
continue;
}
SClientHbBatchReq *pReq = hbGatherAllInfo(pAppHbMgr);
......@@ -908,7 +933,6 @@ static void *hbThreadFunc(void *param) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tFreeClientHbBatchReq(pReq);
// hbClearReqInfo(pAppHbMgr);
taosArrayPush(mgr, &pAppHbMgr);
break;
}
......@@ -920,7 +944,6 @@ static void *hbThreadFunc(void *param) {
tFreeClientHbBatchReq(pReq);
// hbClearReqInfo(pAppHbMgr);
taosMemoryFree(buf);
taosArrayPush(mgr, &pAppHbMgr);
break;
}
pInfo->fp = hbAsyncCallBack;
......@@ -941,12 +964,8 @@ static void *hbThreadFunc(void *param) {
// hbClearReqInfo(pAppHbMgr);
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
taosArrayPush(mgr, &pAppHbMgr);
}
taosArrayDestroy(clientHbMgr.appHbMgrs);
clientHbMgr.appHbMgrs = mgr;
taosThreadMutexUnlock(&clientHbMgr.lock);
taosMsleep(HEARTBEAT_INTERVAL);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册