提交 cd90a9fa 编写于 作者: D dapan1121

add error code for each task

上级 9fd58679
...@@ -71,16 +71,21 @@ typedef struct SCatalogReq { ...@@ -71,16 +71,21 @@ typedef struct SCatalogReq {
bool forceUpdate; bool forceUpdate;
} SCatalogReq; } SCatalogReq;
typedef struct SMetaRes {
int32_t code;
void* pRes;
} SMetaRes;
typedef struct SMetaData { typedef struct SMetaData {
SArray *pDbVgroup; // SArray<SArray<SVgroupInfo>*> SArray *pDbVgroup; // pRes = SArray<SVgroupInfo>*
SArray *pDbCfg; // SArray<SDbCfgInfo> SArray *pDbCfg; // pRes = SDbCfgInfo*
SArray *pDbInfo; // SArray<SDbInfo> SArray *pDbInfo; // pRes = SDbInfo*
SArray *pTableMeta; // SArray<STableMeta*> SArray *pTableMeta; // pRes = STableMeta*
SArray *pTableHash; // SArray<SVgroupInfo> SArray *pTableHash; // pRes = SVgroupInfo*
SArray *pUdfList; // SArray<SFuncInfo> SArray *pUdfList; // pRes = SFuncInfo*
SArray *pIndex; // SArray<SIndexInfo> SArray *pIndex; // pRes = SIndexInfo*
SArray *pUser; // SArray<bool> SArray *pUser; // pRes = bool*
SArray *pQnodeList; // SArray<SQueryNodeAddr> SArray *pQnodeList; // pRes = SQueryNodeAddr*
} SMetaData; } SMetaData;
typedef struct SCatalogCfg { typedef struct SCatalogCfg {
......
...@@ -173,7 +173,6 @@ typedef struct SCtgJob { ...@@ -173,7 +173,6 @@ typedef struct SCtgJob {
SArray* pTasks; SArray* pTasks;
int32_t taskDone; int32_t taskDone;
SMetaData jobRes; SMetaData jobRes;
int32_t rspCode;
uint64_t queryId; uint64_t queryId;
SCatalog* pCtg; SCatalog* pCtg;
...@@ -201,11 +200,12 @@ typedef struct SCtgMsgCtx { ...@@ -201,11 +200,12 @@ typedef struct SCtgMsgCtx {
typedef struct SCtgTask { typedef struct SCtgTask {
CTG_TASK_TYPE type; CTG_TASK_TYPE type;
int32_t taskId; int32_t taskId;
SCtgJob *pJob; SCtgJob* pJob;
void* taskCtx; void* taskCtx;
SCtgMsgCtx msgCtx; SCtgMsgCtx msgCtx;
void* res; int32_t code;
void* res;
} SCtgTask; } SCtgTask;
typedef int32_t (*ctgLanchTaskFp)(SCtgTask*); typedef int32_t (*ctgLanchTaskFp)(SCtgTask*);
......
...@@ -437,13 +437,14 @@ _return: ...@@ -437,13 +437,14 @@ _return:
int32_t ctgDumpTbMetaRes(SCtgTask* pTask) { int32_t ctgDumpTbMetaRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pTableMeta) { if (NULL == pJob->jobRes.pTableMeta) {
pJob->jobRes.pTableMeta = taosArrayInit(pJob->tbMetaNum, POINTER_BYTES); pJob->jobRes.pTableMeta = taosArrayInit(pJob->tbMetaNum, sizeof(SMetaRes));
if (NULL == pJob->jobRes.pTableMeta) { if (NULL == pJob->jobRes.pTableMeta) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
} }
taosArrayPush(pJob->jobRes.pTableMeta, &pTask->res); SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
taosArrayPush(pJob->jobRes.pTableMeta, &res);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -451,14 +452,14 @@ int32_t ctgDumpTbMetaRes(SCtgTask* pTask) { ...@@ -451,14 +452,14 @@ int32_t ctgDumpTbMetaRes(SCtgTask* pTask) {
int32_t ctgDumpDbVgRes(SCtgTask* pTask) { int32_t ctgDumpDbVgRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pDbVgroup) { if (NULL == pJob->jobRes.pDbVgroup) {
pJob->jobRes.pDbVgroup = taosArrayInit(pJob->dbVgNum, POINTER_BYTES); pJob->jobRes.pDbVgroup = taosArrayInit(pJob->dbVgNum, sizeof(SMetaRes));
if (NULL == pJob->jobRes.pDbVgroup) { if (NULL == pJob->jobRes.pDbVgroup) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
} }
taosArrayPush(pJob->jobRes.pDbVgroup, &pTask->res); SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
pTask->res = NULL; taosArrayPush(pJob->jobRes.pDbVgroup, &res);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -466,13 +467,14 @@ int32_t ctgDumpDbVgRes(SCtgTask* pTask) { ...@@ -466,13 +467,14 @@ int32_t ctgDumpDbVgRes(SCtgTask* pTask) {
int32_t ctgDumpTbHashRes(SCtgTask* pTask) { int32_t ctgDumpTbHashRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pTableHash) { if (NULL == pJob->jobRes.pTableHash) {
pJob->jobRes.pTableHash = taosArrayInit(pJob->tbHashNum, sizeof(SVgroupInfo)); pJob->jobRes.pTableHash = taosArrayInit(pJob->tbHashNum, sizeof(SMetaRes));
if (NULL == pJob->jobRes.pTableHash) { if (NULL == pJob->jobRes.pTableHash) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
} }
taosArrayPush(pJob->jobRes.pTableHash, pTask->res); SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
taosArrayPush(pJob->jobRes.pTableHash, &res);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -480,21 +482,29 @@ int32_t ctgDumpTbHashRes(SCtgTask* pTask) { ...@@ -480,21 +482,29 @@ int32_t ctgDumpTbHashRes(SCtgTask* pTask) {
int32_t ctgDumpIndexRes(SCtgTask* pTask) { int32_t ctgDumpIndexRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pIndex) { if (NULL == pJob->jobRes.pIndex) {
pJob->jobRes.pIndex = taosArrayInit(pJob->indexNum, sizeof(SIndexInfo)); pJob->jobRes.pIndex = taosArrayInit(pJob->indexNum, sizeof(SMetaRes));
if (NULL == pJob->jobRes.pIndex) { if (NULL == pJob->jobRes.pIndex) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
} }
taosArrayPush(pJob->jobRes.pIndex, pTask->res); SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
taosArrayPush(pJob->jobRes.pIndex, &res);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgDumpQnodeRes(SCtgTask* pTask) { int32_t ctgDumpQnodeRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pQnodeList) {
pJob->jobRes.pQnodeList = taosArrayInit(1, sizeof(SMetaRes));
if (NULL == pJob->jobRes.pQnodeList) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
}
TSWAP(pJob->jobRes.pQnodeList, pTask->res); SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
taosArrayPush(pJob->jobRes.pQnodeList, &res);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -502,13 +512,14 @@ int32_t ctgDumpQnodeRes(SCtgTask* pTask) { ...@@ -502,13 +512,14 @@ int32_t ctgDumpQnodeRes(SCtgTask* pTask) {
int32_t ctgDumpDbCfgRes(SCtgTask* pTask) { int32_t ctgDumpDbCfgRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pDbCfg) { if (NULL == pJob->jobRes.pDbCfg) {
pJob->jobRes.pDbCfg = taosArrayInit(pJob->dbCfgNum, sizeof(SDbCfgInfo)); pJob->jobRes.pDbCfg = taosArrayInit(pJob->dbCfgNum, sizeof(SMetaRes));
if (NULL == pJob->jobRes.pDbCfg) { if (NULL == pJob->jobRes.pDbCfg) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
} }
taosArrayPush(pJob->jobRes.pDbCfg, pTask->res); SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
taosArrayPush(pJob->jobRes.pDbCfg, &res);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -516,13 +527,14 @@ int32_t ctgDumpDbCfgRes(SCtgTask* pTask) { ...@@ -516,13 +527,14 @@ int32_t ctgDumpDbCfgRes(SCtgTask* pTask) {
int32_t ctgDumpDbInfoRes(SCtgTask* pTask) { int32_t ctgDumpDbInfoRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pDbInfo) { if (NULL == pJob->jobRes.pDbInfo) {
pJob->jobRes.pDbInfo = taosArrayInit(pJob->dbInfoNum, sizeof(SDbInfo)); pJob->jobRes.pDbInfo = taosArrayInit(pJob->dbInfoNum, sizeof(SMetaRes));
if (NULL == pJob->jobRes.pDbInfo) { if (NULL == pJob->jobRes.pDbInfo) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
} }
taosArrayPush(pJob->jobRes.pDbInfo, pTask->res); SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
taosArrayPush(pJob->jobRes.pDbInfo, &res);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -530,13 +542,14 @@ int32_t ctgDumpDbInfoRes(SCtgTask* pTask) { ...@@ -530,13 +542,14 @@ int32_t ctgDumpDbInfoRes(SCtgTask* pTask) {
int32_t ctgDumpUdfRes(SCtgTask* pTask) { int32_t ctgDumpUdfRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pUdfList) { if (NULL == pJob->jobRes.pUdfList) {
pJob->jobRes.pUdfList = taosArrayInit(pJob->udfNum, sizeof(SFuncInfo)); pJob->jobRes.pUdfList = taosArrayInit(pJob->udfNum, sizeof(SMetaRes));
if (NULL == pJob->jobRes.pUdfList) { if (NULL == pJob->jobRes.pUdfList) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
} }
taosArrayPush(pJob->jobRes.pUdfList, pTask->res); SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
taosArrayPush(pJob->jobRes.pUdfList, &res);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -544,13 +557,14 @@ int32_t ctgDumpUdfRes(SCtgTask* pTask) { ...@@ -544,13 +557,14 @@ int32_t ctgDumpUdfRes(SCtgTask* pTask) {
int32_t ctgDumpUserRes(SCtgTask* pTask) { int32_t ctgDumpUserRes(SCtgTask* pTask) {
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
if (NULL == pJob->jobRes.pUser) { if (NULL == pJob->jobRes.pUser) {
pJob->jobRes.pUser = taosArrayInit(pJob->userNum, sizeof(bool)); pJob->jobRes.pUser = taosArrayInit(pJob->userNum, sizeof(SMetaRes));
if (NULL == pJob->jobRes.pUser) { if (NULL == pJob->jobRes.pUser) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
} }
taosArrayPush(pJob->jobRes.pUser, pTask->res); SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
taosArrayPush(pJob->jobRes.pUser, &res);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -561,14 +575,7 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) { ...@@ -561,14 +575,7 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) {
qDebug("QID:0x%" PRIx64 " task %d end with rsp %s", pJob->queryId, pTask->taskId, tstrerror(rspCode)); qDebug("QID:0x%" PRIx64 " task %d end with rsp %s", pJob->queryId, pTask->taskId, tstrerror(rspCode));
if (rspCode) { pTask->code = rspCode;
int32_t lastCode = atomic_val_compare_exchange_32(&pJob->rspCode, 0, rspCode);
if (0 == lastCode) {
CTG_ERR_JRET(rspCode);
}
return TSDB_CODE_SUCCESS;
}
int32_t taskDone = atomic_add_fetch_32(&pJob->taskDone, 1); int32_t taskDone = atomic_add_fetch_32(&pJob->taskDone, 1);
if (taskDone < taosArrayGetSize(pJob->pTasks)) { if (taskDone < taosArrayGetSize(pJob->pTasks)) {
......
...@@ -61,10 +61,12 @@ void ctgFreeSMetaData(SMetaData* pData) { ...@@ -61,10 +61,12 @@ void ctgFreeSMetaData(SMetaData* pData) {
taosArrayDestroy(pData->pUdfList); taosArrayDestroy(pData->pUdfList);
pData->pUdfList = NULL; pData->pUdfList = NULL;
/*
for (int32_t i = 0; i < taosArrayGetSize(pData->pDbCfg); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pData->pDbCfg); ++i) {
SDbCfgInfo* pInfo = taosArrayGet(pData->pDbCfg, i); SDbCfgInfo* pInfo = taosArrayGet(pData->pDbCfg, i);
taosArrayDestroy(pInfo->pRetensions); taosArrayDestroy(pInfo->pRetensions);
} }
*/
taosArrayDestroy(pData->pDbCfg); taosArrayDestroy(pData->pDbCfg);
pData->pDbCfg = NULL; pData->pDbCfg = NULL;
...@@ -320,8 +322,12 @@ void ctgFreeTask(SCtgTask* pTask) { ...@@ -320,8 +322,12 @@ void ctgFreeTask(SCtgTask* pTask) {
break; break;
} }
case CTG_TASK_GET_DB_CFG: { case CTG_TASK_GET_DB_CFG: {
taosMemoryFreeClear(pTask->taskCtx); taosMemoryFreeClear(pTask->taskCtx);
taosMemoryFreeClear(pTask->res); if (pTask->res) {
SDbCfgInfo* pInfo = (SDbCfgInfo*)pTask->res;
taosArrayDestroy(pInfo->pRetensions);
taosMemoryFreeClear(pTask->res);
}
break; break;
} }
case CTG_TASK_GET_DB_INFO: { case CTG_TASK_GET_DB_INFO: {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册