未验证 提交 9ab4c438 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #16049 from taosdata/fix/TD-18359

fix: fix dumplicated ctg callback issue
...@@ -1105,6 +1105,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu ...@@ -1105,6 +1105,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
SName* pName = ctgGetFetchName(ctx->pNames, pFetch); SName* pName = ctgGetFetchName(ctx->pNames, pFetch);
int32_t flag = pFetch->flag; int32_t flag = pFetch->flag;
int32_t* vgId = &pFetch->vgId; int32_t* vgId = &pFetch->vgId;
bool taskDone = false;
CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target)); CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));
...@@ -1250,6 +1251,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu ...@@ -1250,6 +1251,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
pOut->tbMeta = NULL; pOut->tbMeta = NULL;
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) { if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
TSWAP(pTask->res, ctx->pResList); TSWAP(pTask->res, ctx->pResList);
taskDone = true;
} }
_return: _return:
...@@ -1264,10 +1266,11 @@ _return: ...@@ -1264,10 +1266,11 @@ _return:
pRes->pRes = NULL; pRes->pRes = NULL;
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) { if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
TSWAP(pTask->res, ctx->pResList); TSWAP(pTask->res, ctx->pResList);
taskDone = true;
} }
} }
if (pTask->res) { if (pTask->res && taskDone) {
ctgHandleTaskEnd(pTask, code); ctgHandleTaskEnd(pTask, code);
} }
...@@ -1354,6 +1357,7 @@ int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu ...@@ -1354,6 +1357,7 @@ int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
bool taskDone = false;
CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target)); CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));
...@@ -1377,6 +1381,7 @@ int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu ...@@ -1377,6 +1381,7 @@ int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) { if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
TSWAP(pTask->res, ctx->pResList); TSWAP(pTask->res, ctx->pResList);
taskDone = true;
} }
_return: _return:
...@@ -1392,10 +1397,11 @@ _return: ...@@ -1392,10 +1397,11 @@ _return:
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) { if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
TSWAP(pTask->res, ctx->pResList); TSWAP(pTask->res, ctx->pResList);
taskDone = true;
} }
} }
if (pTask->res) { if (pTask->res && taskDone) {
ctgHandleTaskEnd(pTask, code); ctgHandleTaskEnd(pTask, code);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册