未验证 提交 ef5c9cfc 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #16848 from taosdata/fix/avoidTscDeadlock

enh(tsc): handle deadlock
......@@ -173,7 +173,8 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
pTscObj->pAppInfo->totalDnodes = pRsp->query->totalDnodes;
pTscObj->pAppInfo->onlineDnodes = pRsp->query->onlineDnodes;
pTscObj->connId = pRsp->query->connId;
tscTrace("conn %p hb rsp, dnodes %d/%d", pTscObj->connId, pTscObj->pAppInfo->onlineDnodes, pTscObj->pAppInfo->totalDnodes);
tscTrace("conn %p hb rsp, dnodes %d/%d", pTscObj->connId, pTscObj->pAppInfo->onlineDnodes,
pTscObj->pAppInfo->totalDnodes);
if (pRsp->query->killRid) {
tscDebug("request rid %" PRIx64 " need to be killed now", pRsp->query->killRid);
......@@ -297,7 +298,8 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
if (code != 0) {
(*pInst)->onlineDnodes = ((*pInst)->totalDnodes ? 0 : -1);
tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), (*pInst)->onlineDnodes, (*pInst)->totalDnodes);
tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), (*pInst)->onlineDnodes,
(*pInst)->totalDnodes);
}
if (rspNum) {
......@@ -657,6 +659,8 @@ int32_t hbGatherAppInfo(void) {
for (int32_t i = 0; i < sz; ++i) {
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
if (pAppHbMgr == NULL) continue;
uint64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
if (NULL == pApp) {
......@@ -694,15 +698,21 @@ static void *hbThreadFunc(void *param) {
hbGatherAppInfo();
}
SArray *mgr = taosArrayInit(sz, sizeof(void *));
for (int i = 0; i < sz; i++) {
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
if (pAppHbMgr == NULL) {
continue;
}
int32_t connCnt = atomic_load_32(&pAppHbMgr->connKeyCnt);
if (connCnt == 0) {
taosArrayPush(mgr, &pAppHbMgr);
continue;
}
SClientHbBatchReq *pReq = hbGatherAllInfo(pAppHbMgr);
if (pReq == NULL) {
if (pReq == NULL || taosArrayGetP(clientHbMgr.appHbMgrs, i) == NULL) {
tFreeClientHbBatchReq(pReq);
continue;
}
int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq);
......@@ -711,6 +721,7 @@ static void *hbThreadFunc(void *param) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
tFreeClientHbBatchReq(pReq);
// hbClearReqInfo(pAppHbMgr);
taosArrayPush(mgr, &pAppHbMgr);
break;
}
......@@ -722,6 +733,7 @@ static void *hbThreadFunc(void *param) {
tFreeClientHbBatchReq(pReq);
// hbClearReqInfo(pAppHbMgr);
taosMemoryFree(buf);
taosArrayPush(mgr, &pAppHbMgr);
break;
}
pInfo->fp = hbAsyncCallBack;
......@@ -729,7 +741,7 @@ static void *hbThreadFunc(void *param) {
pInfo->msgInfo.len = tlen;
pInfo->msgType = TDMT_MND_HEARTBEAT;
pInfo->param = strdup(pAppHbMgr->key);
pInfo->paramFreeFp = taosMemoryFree;
pInfo->paramFreeFp = taosMemoryFree;
pInfo->requestId = generateRequestId();
pInfo->requestObjRefId = 0;
......@@ -741,8 +753,12 @@ static void *hbThreadFunc(void *param) {
// hbClearReqInfo(pAppHbMgr);
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
taosArrayPush(mgr, &pAppHbMgr);
}
taosArrayDestroy(clientHbMgr.appHbMgrs);
clientHbMgr.appHbMgrs = mgr;
taosThreadMutexUnlock(&clientHbMgr.lock);
taosMsleep(HEARTBEAT_INTERVAL);
......@@ -834,7 +850,7 @@ void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr) {
if (pItem == *pAppHbMgr) {
hbFreeAppHbMgr(*pAppHbMgr);
*pAppHbMgr = NULL;
taosArrayRemove(clientHbMgr.appHbMgrs, i);
taosArraySet(clientHbMgr.appHbMgrs, i, pAppHbMgr);
break;
}
}
......@@ -845,6 +861,7 @@ void appHbMgrCleanup(void) {
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
for (int i = 0; i < sz; i++) {
SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
if (pTarget == NULL) continue;
hbFreeAppHbMgr(pTarget);
}
}
......@@ -859,7 +876,14 @@ int hbMgrInit() {
clientHbMgr.appSummary = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
clientHbMgr.appHbMgrs = taosArrayInit(0, sizeof(void *));
taosThreadMutexInit(&clientHbMgr.lock, NULL);
TdThreadMutexAttr attr = {0};
taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
int ret = taosThreadMutexAttrInit(&attr);
assert(ret == 0);
taosThreadMutexInit(&clientHbMgr.lock, &attr);
taosThreadMutexAttrDestroy(&attr);
// init handle funcs
hbMgrInitHandle();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册