提交 33c48fe9 编写于 作者: K kailixu

fix: client hb logic fix and optimization

上级 143d3b2e
...@@ -24,6 +24,8 @@ typedef struct { ...@@ -24,6 +24,8 @@ typedef struct {
struct { struct {
int64_t clusterId; int64_t clusterId;
int32_t passKeyCnt; int32_t passKeyCnt;
int32_t passVer;
int32_t reqCnt;
}; };
}; };
} SHbParam; } SHbParam;
...@@ -536,14 +538,20 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { ...@@ -536,14 +538,20 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SHbParam *param, SClientHbReq *req) {
STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid); STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
if (!pTscObj) { if (!pTscObj) {
tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid); tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;
} }
int32_t code = 0; int32_t code = 0;
if ((param->passVer != INT32_MIN) && (param->passVer <= pTscObj->passInfo.ver)) {
tscDebug("hb got user basic info, no need since passVer %d <= %d", param->passVer, pTscObj->passInfo.ver);
goto _return;
}
SUserPassVersion *user = taosMemoryMalloc(sizeof(SUserPassVersion)); SUserPassVersion *user = taosMemoryMalloc(sizeof(SUserPassVersion));
if (!user) { if (!user) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
...@@ -570,6 +578,9 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) { ...@@ -570,6 +578,9 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
goto _return; goto _return;
} }
// assign the passVer
param->passVer = pTscObj->passInfo.ver;
_return: _return:
releaseTscObj(connKey->tscRid); releaseTscObj(connKey->tscRid);
if (code) { if (code) {
...@@ -714,13 +725,16 @@ int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) { ...@@ -714,13 +725,16 @@ int32_t hbGetAppInfo(int64_t clusterId, SClientHbReq *req) {
} }
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) {
SHbParam *hbParam = (SHbParam *)param; int32_t code = 0;
struct SCatalog *pCatalog = NULL; SHbParam *hbParam = (SHbParam *)param;
SCatalog *pCatalog = NULL;
int32_t code = catalogGetHandle(hbParam->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) { if (hbParam->reqCnt == 0) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code)); code = catalogGetHandle(hbParam->clusterId, &pCatalog);
return code; if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
return code;
}
} }
hbGetAppInfo(hbParam->clusterId, req); hbGetAppInfo(hbParam->clusterId, req);
...@@ -728,24 +742,28 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req ...@@ -728,24 +742,28 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
hbGetQueryBasicInfo(connKey, req); hbGetQueryBasicInfo(connKey, req);
if (hbParam->passKeyCnt > 0) { if (hbParam->passKeyCnt > 0) {
hbGetUserBasicInfo(connKey, req); hbGetUserBasicInfo(connKey, hbParam, req);
} }
code = hbGetExpiredUserInfo(connKey, pCatalog, req); if (hbParam->reqCnt == 0) {
if (TSDB_CODE_SUCCESS != code) { code = hbGetExpiredUserInfo(connKey, pCatalog, req);
return code; if (TSDB_CODE_SUCCESS != code) {
} return code;
}
code = hbGetExpiredDBInfo(connKey, pCatalog, req); code = hbGetExpiredDBInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
} }
code = hbGetExpiredStbInfo(connKey, pCatalog, req); code = hbGetExpiredStbInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
return code; return code;
}
} }
++hbParam->reqCnt; // success to get catalog info
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -766,6 +784,10 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { ...@@ -766,6 +784,10 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
} }
int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt); int32_t connKeyCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq)); pBatchReq->reqs = taosArrayInit(connKeyCnt, sizeof(SClientHbReq));
if (!pBatchReq->reqs) {
tFreeClientHbBatchReq(pBatchReq);
return NULL;
}
int64_t rid = -1; int64_t rid = -1;
int32_t code = 0; int32_t code = 0;
...@@ -782,12 +804,18 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { ...@@ -782,12 +804,18 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
return NULL; return NULL;
} }
SHbParam param = {0};
while (pIter != NULL) { while (pIter != NULL) {
pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq); pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq);
SHbParam param;
switch (pOneReq->connKey.connType) { switch (pOneReq->connKey.connType) {
case CONN_TYPE__QUERY: { case CONN_TYPE__QUERY: {
param.clusterId = pOneReq->clusterId; if (param.clusterId == 0) {
// init
param.clusterId = pOneReq->clusterId;
param.passVer = INT32_MIN;
}
param.passKeyCnt = atomic_load_32(&pAppHbMgr->passKeyCnt); param.passKeyCnt = atomic_load_32(&pAppHbMgr->passKeyCnt);
break; break;
} }
...@@ -801,9 +829,7 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { ...@@ -801,9 +829,7 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
pOneReq->connKey.tscRid, pOneReq->connKey.connType); pOneReq->connKey.tscRid, pOneReq->connKey.connType);
} }
} }
break;
#if 0
if (code) { if (code) {
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
pOneReq = pIter; pOneReq = pIter;
...@@ -812,7 +838,6 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { ...@@ -812,7 +838,6 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
pOneReq = pIter; pOneReq = pIter;
#endif
} }
releaseTscObj(rid); releaseTscObj(rid);
...@@ -885,7 +910,6 @@ static void *hbThreadFunc(void *param) { ...@@ -885,7 +910,6 @@ static void *hbThreadFunc(void *param) {
hbGatherAppInfo(); hbGatherAppInfo();
} }
SArray *mgr = taosArrayInit(sz, sizeof(void *));
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i); SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
if (pAppHbMgr == NULL) { if (pAppHbMgr == NULL) {
...@@ -894,7 +918,6 @@ static void *hbThreadFunc(void *param) { ...@@ -894,7 +918,6 @@ static void *hbThreadFunc(void *param) {
int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt); int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
if (connCnt == 0) { if (connCnt == 0) {
taosArrayPush(mgr, &pAppHbMgr);
continue; continue;
} }
SClientHbBatchReq *pReq = hbGatherAllInfo(pAppHbMgr); SClientHbBatchReq *pReq = hbGatherAllInfo(pAppHbMgr);
...@@ -908,7 +931,6 @@ static void *hbThreadFunc(void *param) { ...@@ -908,7 +931,6 @@ static void *hbThreadFunc(void *param) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tFreeClientHbBatchReq(pReq); tFreeClientHbBatchReq(pReq);
// hbClearReqInfo(pAppHbMgr); // hbClearReqInfo(pAppHbMgr);
taosArrayPush(mgr, &pAppHbMgr);
break; break;
} }
...@@ -920,7 +942,6 @@ static void *hbThreadFunc(void *param) { ...@@ -920,7 +942,6 @@ static void *hbThreadFunc(void *param) {
tFreeClientHbBatchReq(pReq); tFreeClientHbBatchReq(pReq);
// hbClearReqInfo(pAppHbMgr); // hbClearReqInfo(pAppHbMgr);
taosMemoryFree(buf); taosMemoryFree(buf);
taosArrayPush(mgr, &pAppHbMgr);
break; break;
} }
pInfo->fp = hbAsyncCallBack; pInfo->fp = hbAsyncCallBack;
...@@ -941,12 +962,8 @@ static void *hbThreadFunc(void *param) { ...@@ -941,12 +962,8 @@ static void *hbThreadFunc(void *param) {
// hbClearReqInfo(pAppHbMgr); // hbClearReqInfo(pAppHbMgr);
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
taosArrayPush(mgr, &pAppHbMgr);
} }
taosArrayDestroy(clientHbMgr.appHbMgrs);
clientHbMgr.appHbMgrs = mgr;
taosThreadMutexUnlock(&clientHbMgr.lock); taosThreadMutexUnlock(&clientHbMgr.lock);
taosMsleep(HEARTBEAT_INTERVAL); taosMsleep(HEARTBEAT_INTERVAL);
...@@ -1179,6 +1196,4 @@ void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) { ...@@ -1179,6 +1196,4 @@ void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) {
} }
// set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner // set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner
void taos_set_hb_quit(int8_t quitByKill) { void taos_set_hb_quit(int8_t quitByKill) { clientHbMgr.quitByKill = quitByKill; }
clientHbMgr.quitByKill = quitByKill;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册