未验证 提交 313e7259 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #15855 from taosdata/fix/TD-18261

fix: fix catalog crash issue
...@@ -279,6 +279,7 @@ typedef struct SCtgMsgCtx { ...@@ -279,6 +279,7 @@ typedef struct SCtgMsgCtx {
void* lastOut; void* lastOut;
void* out; void* out;
char* target; char* target;
SHashObj* pBatchs;
} SCtgMsgCtx; } SCtgMsgCtx;
...@@ -315,7 +316,6 @@ typedef struct SCtgTask { ...@@ -315,7 +316,6 @@ typedef struct SCtgTask {
SRWLatch lock; SRWLatch lock;
SArray* pParents; SArray* pParents;
SCtgSubRes subRes; SCtgSubRes subRes;
SHashObj* pBatchs;
} SCtgTask; } SCtgTask;
typedef struct SCtgTaskReq { typedef struct SCtgTaskReq {
......
...@@ -855,6 +855,7 @@ int32_t ctgCallSubCb(SCtgTask *pTask) { ...@@ -855,6 +855,7 @@ int32_t ctgCallSubCb(SCtgTask *pTask) {
int32_t parentNum = taosArrayGetSize(pTask->pParents); int32_t parentNum = taosArrayGetSize(pTask->pParents);
for (int32_t i = 0; i < parentNum; ++i) { for (int32_t i = 0; i < parentNum; ++i) {
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
SCtgTask* pParent = taosArrayGetP(pTask->pParents, i); SCtgTask* pParent = taosArrayGetP(pTask->pParents, i);
pParent->subRes.code = pTask->code; pParent->subRes.code = pTask->code;
...@@ -865,7 +866,9 @@ int32_t ctgCallSubCb(SCtgTask *pTask) { ...@@ -865,7 +866,9 @@ int32_t ctgCallSubCb(SCtgTask *pTask) {
} }
} }
pParent->pBatchs = pTask->pBatchs; SCtgMsgCtx *pParMsgCtx = CTG_GET_TASK_MSGCTX(pParent, -1);
pParMsgCtx->pBatchs = pMsgCtx->pBatchs;
CTG_ERR_JRET(pParent->subRes.fp(pParent)); CTG_ERR_JRET(pParent->subRes.fp(pParent));
} }
...@@ -1625,6 +1628,11 @@ _return: ...@@ -1625,6 +1628,11 @@ _return:
int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) { int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs;
}
CTG_ERR_RET(ctgGetTbMetaFromCache(pCtg, pConn, (SCtgTbMetaCtx*)pTask->taskCtx, (STableMeta**)&pTask->res)); CTG_ERR_RET(ctgGetTbMetaFromCache(pCtg, pConn, (SCtgTbMetaCtx*)pTask->taskCtx, (STableMeta**)&pTask->res));
if (pTask->res) { if (pTask->res) {
...@@ -1645,6 +1653,7 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) { ...@@ -1645,6 +1653,7 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgTbMetasCtx* pCtx = (SCtgTbMetasCtx*)pTask->taskCtx; SCtgTbMetasCtx* pCtx = (SCtgTbMetasCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob;
int32_t dbNum = taosArrayGetSize(pCtx->pNames); int32_t dbNum = taosArrayGetSize(pCtx->pNames);
int32_t fetchIdx = 0; int32_t fetchIdx = 0;
...@@ -1670,7 +1679,11 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) { ...@@ -1670,7 +1679,11 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) {
for (int32_t i = 0; i < pCtx->fetchNum; ++i) { for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i); SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i);
SName* pName = ctgGetFetchName(pCtx->pNames, pFetch); SName* pName = ctgGetFetchName(pCtx->pNames, pFetch);
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i);
if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs;
}
SCtgTaskReq tReq; SCtgTaskReq tReq;
tReq.pTask = pTask; tReq.pTask = pTask;
tReq.msgIdx = pFetch->fetchIdx; tReq.msgIdx = pFetch->fetchIdx;
...@@ -1686,6 +1699,11 @@ int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) { ...@@ -1686,6 +1699,11 @@ int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) {
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
SCtgDbVgCtx* pCtx = (SCtgDbVgCtx*)pTask->taskCtx; SCtgDbVgCtx* pCtx = (SCtgDbVgCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs;
}
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache)); CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache));
if (NULL != dbCache) { if (NULL != dbCache) {
...@@ -1722,6 +1740,11 @@ int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) { ...@@ -1722,6 +1740,11 @@ int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) {
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
SCtgTbHashCtx* pCtx = (SCtgTbHashCtx*)pTask->taskCtx; SCtgTbHashCtx* pCtx = (SCtgTbHashCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs;
}
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache)); CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache));
if (NULL != dbCache) { if (NULL != dbCache) {
...@@ -1761,6 +1784,7 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) { ...@@ -1761,6 +1784,7 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) {
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgTbHashsCtx* pCtx = (SCtgTbHashsCtx*)pTask->taskCtx; SCtgTbHashsCtx* pCtx = (SCtgTbHashsCtx*)pTask->taskCtx;
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
SCtgJob* pJob = pTask->pJob;
int32_t dbNum = taosArrayGetSize(pCtx->pNames); int32_t dbNum = taosArrayGetSize(pCtx->pNames);
int32_t fetchIdx = 0; int32_t fetchIdx = 0;
int32_t baseResIdx = 0; int32_t baseResIdx = 0;
...@@ -1803,7 +1827,11 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) { ...@@ -1803,7 +1827,11 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) {
for (int32_t i = 0; i < pCtx->fetchNum; ++i) { for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i); SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i);
STablesReq* pReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx); STablesReq* pReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i);
if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs;
}
SBuildUseDBInput input = {0}; SBuildUseDBInput input = {0};
strcpy(input.db, pReq->dbFName); strcpy(input.db, pReq->dbFName);
...@@ -1831,6 +1859,11 @@ int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) { ...@@ -1831,6 +1859,11 @@ int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) {
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgTbIndexCtx* pCtx = (SCtgTbIndexCtx*)pTask->taskCtx; SCtgTbIndexCtx* pCtx = (SCtgTbIndexCtx*)pTask->taskCtx;
SArray* pRes = NULL; SArray* pRes = NULL;
SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs;
}
CTG_ERR_RET(ctgReadTbIndexFromCache(pCtg, pCtx->pName, &pRes)); CTG_ERR_RET(ctgReadTbIndexFromCache(pCtg, pCtx->pName, &pRes));
if (pRes) { if (pRes) {
...@@ -1852,6 +1885,11 @@ int32_t ctgLaunchGetTbCfgTask(SCtgTask *pTask) { ...@@ -1852,6 +1885,11 @@ int32_t ctgLaunchGetTbCfgTask(SCtgTask *pTask) {
SArray* pRes = NULL; SArray* pRes = NULL;
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pCtx->pName, dbFName); tNameGetFullDbName(pCtx->pName, dbFName);
SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs;
}
if (pCtx->tbType <= 0) { if (pCtx->tbType <= 0) {
CTG_ERR_JRET(ctgReadTbTypeFromCache(pCtg, dbFName, pCtx->pName->tname, &pCtx->tbType)); CTG_ERR_JRET(ctgReadTbTypeFromCache(pCtg, dbFName, pCtx->pName->tname, &pCtx->tbType));
...@@ -1890,6 +1928,11 @@ _return: ...@@ -1890,6 +1928,11 @@ _return:
int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) { int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs;
}
CTG_ERR_RET(ctgGetQnodeListFromMnode(pCtg, pConn, NULL, pTask)); CTG_ERR_RET(ctgGetQnodeListFromMnode(pCtg, pConn, NULL, pTask));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1898,6 +1941,11 @@ int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) { ...@@ -1898,6 +1941,11 @@ int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) {
int32_t ctgLaunchGetDnodeTask(SCtgTask *pTask) { int32_t ctgLaunchGetDnodeTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs;
}
CTG_ERR_RET(ctgGetDnodeListFromMnode(pCtg, pConn, NULL, pTask)); CTG_ERR_RET(ctgGetDnodeListFromMnode(pCtg, pConn, NULL, pTask));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1908,6 +1956,11 @@ int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) { ...@@ -1908,6 +1956,11 @@ int32_t ctgLaunchGetDbCfgTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgDbCfgCtx* pCtx = (SCtgDbCfgCtx*)pTask->taskCtx; SCtgDbCfgCtx* pCtx = (SCtgDbCfgCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs;
}
CTG_ERR_RET(ctgGetDBCfgFromMnode(pCtg, pConn, pCtx->dbFName, NULL, pTask)); CTG_ERR_RET(ctgGetDBCfgFromMnode(pCtg, pConn, pCtx->dbFName, NULL, pTask));
...@@ -1919,6 +1972,11 @@ int32_t ctgLaunchGetDbInfoTask(SCtgTask *pTask) { ...@@ -1919,6 +1972,11 @@ int32_t ctgLaunchGetDbInfoTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
SCtgDbInfoCtx* pCtx = (SCtgDbInfoCtx*)pTask->taskCtx; SCtgDbInfoCtx* pCtx = (SCtgDbInfoCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs;
}
pTask->res = taosMemoryCalloc(1, sizeof(SDbInfo)); pTask->res = taosMemoryCalloc(1, sizeof(SDbInfo));
if (NULL == pTask->res) { if (NULL == pTask->res) {
...@@ -1953,6 +2011,11 @@ int32_t ctgLaunchGetIndexTask(SCtgTask *pTask) { ...@@ -1953,6 +2011,11 @@ int32_t ctgLaunchGetIndexTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgIndexCtx* pCtx = (SCtgIndexCtx*)pTask->taskCtx; SCtgIndexCtx* pCtx = (SCtgIndexCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs;
}
CTG_ERR_RET(ctgGetIndexInfoFromMnode(pCtg, pConn, pCtx->indexFName, NULL, pTask)); CTG_ERR_RET(ctgGetIndexInfoFromMnode(pCtg, pConn, pCtx->indexFName, NULL, pTask));
...@@ -1963,6 +2026,11 @@ int32_t ctgLaunchGetUdfTask(SCtgTask *pTask) { ...@@ -1963,6 +2026,11 @@ int32_t ctgLaunchGetUdfTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgUdfCtx* pCtx = (SCtgUdfCtx*)pTask->taskCtx; SCtgUdfCtx* pCtx = (SCtgUdfCtx*)pTask->taskCtx;
SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs;
}
CTG_ERR_RET(ctgGetUdfInfoFromMnode(pCtg, pConn, pCtx->udfName, NULL, pTask)); CTG_ERR_RET(ctgGetUdfInfoFromMnode(pCtg, pConn, pCtx->udfName, NULL, pTask));
...@@ -1975,6 +2043,11 @@ int32_t ctgLaunchGetUserTask(SCtgTask *pTask) { ...@@ -1975,6 +2043,11 @@ int32_t ctgLaunchGetUserTask(SCtgTask *pTask) {
SCtgUserCtx* pCtx = (SCtgUserCtx*)pTask->taskCtx; SCtgUserCtx* pCtx = (SCtgUserCtx*)pTask->taskCtx;
bool inCache = false; bool inCache = false;
bool pass = false; bool pass = false;
SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs;
}
CTG_ERR_RET(ctgChkAuthFromCache(pCtg, pCtx->user.user, pCtx->user.dbFName, pCtx->user.type, &inCache, &pass)); CTG_ERR_RET(ctgChkAuthFromCache(pCtg, pCtx->user.user, pCtx->user.dbFName, pCtx->user.type, &inCache, &pass));
if (inCache) { if (inCache) {
...@@ -1996,6 +2069,11 @@ int32_t ctgLaunchGetUserTask(SCtgTask *pTask) { ...@@ -1996,6 +2069,11 @@ int32_t ctgLaunchGetUserTask(SCtgTask *pTask) {
int32_t ctgLaunchGetSvrVerTask(SCtgTask *pTask) { int32_t ctgLaunchGetSvrVerTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgJob* pJob = pTask->pJob;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
if (NULL == pMsgCtx->pBatchs) {
pMsgCtx->pBatchs = pJob->pBatchs;
}
CTG_ERR_RET(ctgGetSvrVerFromMnode(pCtg, pConn, NULL, pTask)); CTG_ERR_RET(ctgGetSvrVerFromMnode(pCtg, pConn, NULL, pTask));
...@@ -2129,7 +2207,10 @@ int32_t ctgSetSubTaskCb(SCtgTask *pSub, SCtgTask *pTask) { ...@@ -2129,7 +2207,10 @@ int32_t ctgSetSubTaskCb(SCtgTask *pSub, SCtgTask *pTask) {
if (CTG_TASK_DONE == pSub->status) { if (CTG_TASK_DONE == pSub->status) {
pTask->subRes.code = pSub->code; pTask->subRes.code = pSub->code;
CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].cloneFp)(pSub, &pTask->subRes.res)); CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].cloneFp)(pSub, &pTask->subRes.res));
pTask->pBatchs = pSub->pBatchs; SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
SCtgMsgCtx *pSubMsgCtx = CTG_GET_TASK_MSGCTX(pSub, -1);
pMsgCtx->pBatchs = pSubMsgCtx->pBatchs;
CTG_ERR_JRET(pTask->subRes.fp(pTask)); CTG_ERR_JRET(pTask->subRes.fp(pTask));
} else { } else {
if (NULL == pSub->pParents) { if (NULL == pSub->pParents) {
...@@ -2167,7 +2248,10 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, ...@@ -2167,7 +2248,10 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp,
CTG_ERR_RET(ctgSetSubTaskCb(pSub, pTask)); CTG_ERR_RET(ctgSetSubTaskCb(pSub, pTask));
if (newTask) { if (newTask) {
pSub->pBatchs = pTask->pBatchs; SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
SCtgMsgCtx *pSubMsgCtx = CTG_GET_TASK_MSGCTX(pSub, -1);
pSubMsgCtx->pBatchs = pMsgCtx->pBatchs;
CTG_ERR_RET((*gCtgAsyncFps[pSub->type].launchFp)(pSub)); CTG_ERR_RET((*gCtgAsyncFps[pSub->type].launchFp)(pSub));
pSub->status = CTG_TASK_LAUNCHED; pSub->status = CTG_TASK_LAUNCHED;
} }
...@@ -2180,7 +2264,6 @@ int32_t ctgLaunchJob(SCtgJob *pJob) { ...@@ -2180,7 +2264,6 @@ int32_t ctgLaunchJob(SCtgJob *pJob) {
for (int32_t i = 0; i < taskNum; ++i) { for (int32_t i = 0; i < taskNum; ++i) {
SCtgTask *pTask = taosArrayGet(pJob->pTasks, i); SCtgTask *pTask = taosArrayGet(pJob->pTasks, i);
pTask->pBatchs = pJob->pBatchs;
qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId); qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId);
CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask)); CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask));
......
...@@ -68,14 +68,14 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu ...@@ -68,14 +68,14 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
taskMsg.pData = NULL; taskMsg.pData = NULL;
taskMsg.len = 0; taskMsg.len = 0;
} }
pTask->pBatchs = pBatchs;
SCtgTaskReq tReq; SCtgTaskReq tReq;
tReq.pTask = pTask; tReq.pTask = pTask;
tReq.msgIdx = rsp.msgIdx; tReq.msgIdx = rsp.msgIdx;
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq.msgIdx);
pMsgCtx->pBatchs = pBatchs;
ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s", pJob->queryId, pTask->taskId, rsp.msgIdx, TMSG_INFO(taskMsg.msgType + 1)); ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s, pBatchs: %p", pJob->queryId, pTask->taskId, rsp.msgIdx, TMSG_INFO(taskMsg.msgType + 1), pBatchs);
(*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode)); (*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode));
} }
...@@ -343,7 +343,9 @@ int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) { ...@@ -343,7 +343,9 @@ int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) {
ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM); ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
pTask->pBatchs = pBatchs;
SCtgMsgCtx *pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
pMsgCtx->pBatchs = pBatchs;
#endif #endif
SCtgTaskReq tReq; SCtgTaskReq tReq;
...@@ -444,7 +446,8 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT ...@@ -444,7 +446,8 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
uint32_t msgSize) { uint32_t msgSize) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask; SCtgTask* pTask = tReq->pTask;
SHashObj* pBatchs = pTask->pBatchs; SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
SHashObj* pBatchs = pMsgCtx->pBatchs;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId)); SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId));
SCtgBatch newBatch = {0}; SCtgBatch newBatch = {0};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册