From dad8b812a7b70b89f1c53ac1347582bad8a257df Mon Sep 17 00:00:00 2001 From: dapan Date: Sat, 12 Mar 2022 17:19:50 +0800 Subject: [PATCH] feature/scheduler --- include/common/tmsg.h | 7 ++- include/libs/catalog/catalog.h | 2 + include/libs/qcom/query.h | 7 ++- source/client/src/clientImpl.c | 44 ++++++++++---- source/common/src/tmsg.c | 54 +++++++++++++++++ source/dnode/vnode/src/vnd/vnodeWrite.c | 15 ++++- source/libs/catalog/src/catalog.c | 65 ++++++++++++++------ source/libs/executor/src/executorimpl.c | 6 +- source/libs/scheduler/inc/schedulerInt.h | 2 +- source/libs/scheduler/src/scheduler.c | 77 ++++++++++++++++++++---- 10 files changed, 228 insertions(+), 51 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a3d0ab3903..e92d47973e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1375,13 +1375,16 @@ typedef struct { SArray* pArray; } SVCreateTbBatchReq; +int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq); +void* tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pReq); typedef struct { SArray* rspList; // SArray } SVCreateTbBatchRsp; -int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq); -void* tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pReq); +int32_t tSerializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp); +int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp); + typedef struct { int64_t ver; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index a99a97f547..dd5f7fc104 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -112,6 +112,8 @@ int32_t catalogUpdateDBVgInfo(SCatalog* pCatalog, const char* dbName, uint64_t d int32_t catalogRemoveDB(SCatalog* pCatalog, const char* dbName, uint64_t dbId); +int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName); + int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid); /** diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 70e93efee1..4e0a1e942c 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -181,8 +181,11 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE #define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE -#define IS_CLIENT_RETRY_ERROR(_code) ((_code) == TSDB_CODE_VND_HASH_MISMATCH) -#define IS_SCHEDULER_RETRY_ERROR(_code) ((_code) == TSDB_CODE_RPC_REDIRECT) +#define NEED_CLIENT_RM_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_INVALID_TABLE_ID || (_code) == TSDB_CODE_VND_TB_NOT_EXIST) +#define NEED_CLIENT_REFRESH_VG_ERROR(_code) ((_code) == TSDB_CODE_VND_HASH_MISMATCH) +#define NEED_CLIENT_HANDLE_ERROR(_code) (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code)) + +#define NEED_SCHEDULER_RETRY_ERROR(_code) ((_code) == TSDB_CODE_RPC_REDIRECT) #define qFatal(...) \ do { \ diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 5bc7417955..84d9b54b62 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -299,9 +299,10 @@ int32_t clientProcessErrorList(SArray **pList) { SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { SRequestObj* pRequest = NULL; int32_t code = 0; - bool quit = false; + int32_t needRetryNum = 0; + int32_t needRetryFailNum = 0; - while (!quit) { + while (true) { pRequest = execQueryImpl(pTscObj, sql, sqlLen); if (TSDB_CODE_SUCCESS == pRequest->code || NULL == pRequest->errList) { break; @@ -315,30 +316,49 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { int32_t errNum = (int32_t)taosArrayGetSize(pRequest->errList); for (int32_t i = 0; i < errNum; ++i) { SQueryErrorInfo *errInfo = taosArrayGet(pRequest->errList, i); + int32_t tcode = 0; - if (TSDB_CODE_VND_HASH_MISMATCH == errInfo->code) { + if (NEED_CLIENT_REFRESH_VG_ERROR(errInfo->code)) { + ++needRetryNum; + SCatalog *pCatalog = NULL; - code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); - if (code != TSDB_CODE_SUCCESS) { - quit = true; - break; + tcode = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); + if (tcode != TSDB_CODE_SUCCESS) { + ++needRetryFailNum; + code = tcode; + continue; } + SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(&errInfo->tableName, dbFName); - code = catalogRefreshDBVgInfo(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, dbFName); - if (code != TSDB_CODE_SUCCESS) { - quit = true; - break; + tcode = catalogRefreshDBVgInfo(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, dbFName); + if (tcode != TSDB_CODE_SUCCESS) { + ++needRetryFailNum; + code = tcode; + continue; + } + } else if (NEED_CLIENT_RM_TBLMETA_ERROR(errInfo->code)) { + SCatalog *pCatalog = NULL; + tcode = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); + if (tcode != TSDB_CODE_SUCCESS) { + code = tcode; + continue; } + + catalogRemoveTableMeta(pCatalog, &errInfo->tableName); } } - if (!quit) { + if ((needRetryNum && (0 == needRetryFailNum) && (TDMT_VND_SUBMIT != pRequest->type && TDMT_VND_CREATE_TABLE != pRequest->type)) + || (needRetryNum && (needRetryNum > needRetryFailNum) && (TDMT_VND_SUBMIT == pRequest->type && TDMT_VND_CREATE_TABLE == pRequest->type))) { destroyRequest(pRequest); + continue; } + + break; } if (code) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index a66674aef1..eed3d8fa37 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2656,6 +2656,60 @@ int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pR return 0; } +int32_t tSerializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (pRsp->rspList) { + int32_t num = taosArrayGetSize(pRsp->rspList); + if (tEncodeI32(&encoder, num) < 0) return -1; + for (int32_t i = 0; i < num; ++i) { + SVCreateTbRsp *rsp = taosArrayGet(pRsp->rspList, i); + if (tEncodeI32(&encoder, rsp->code) < 0) return -1; + if (tEncodeU8(&encoder, rsp->tableName.type) < 0) return -1; + if (tEncodeI32(&encoder, rsp->tableName.acctId) < 0) return -1; + if (tEncodeCStr(&encoder, rsp->tableName.dbname) < 0) return -1; + if (tEncodeCStr(&encoder, rsp->tableName.tname) < 0) return -1; + } + } else { + if (tEncodeI32(&encoder, 0) < 0) return -1; + } + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchRsp *pRsp) { + SCoder decoder = {0}; + int32_t num = 0; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &num) < 0) return -1; + if (num > 0) { + pRsp->rspList = taosArrayInit(num, sizeof(SVCreateTbRsp)); + if (NULL == pRsp->rspList) return -1; + for (int32_t i = 0; i < num; ++i) { + SVCreateTbRsp rsp = {0}; + if (tDecodeI32(&decoder, &rsp.code) < 0) return -1; + if (tDecodeU8(&decoder, &rsp.tableName.type) < 0) return -1; + if (tDecodeI32(&decoder, &rsp.tableName.acctId) < 0) return -1; + if (tDecodeCStrTo(&decoder, rsp.tableName.dbname) < 0) return -1; + if (tDecodeCStrTo(&decoder, rsp.tableName.tname) < 0) return -1; + if (NULL == taosArrayPush(pRsp->rspList, &rsp)) return -1; + } + } else { + pRsp->rspList = NULL; + } + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + int32_t tSerializeSVCreateTSmaReq(void **buf, SVCreateTSmaReq *pReq) { int32_t tlen = 0; diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index e7ba46d3ec..e9978f208e 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -87,7 +87,6 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { #if 0 char tableFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(&pCreateTbReq->name, tableFName); - #endif int32_t code = vnodeValidateTableHash(&pVnode->config, tableFName); if (code) { @@ -107,6 +106,8 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { taosArrayPush(vCreateTbBatchRsp.rspList, &rsp); } + #endif + if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) { // TODO: handle error vError("vgId:%d, failed to create table: %s", pVnode->vgId, pCreateTbReq->name); @@ -125,7 +126,17 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { vTrace("vgId:%d process create %" PRIzu " tables", pVnode->vgId, taosArrayGetSize(vCreateTbBatchReq.pArray)); taosArrayDestroy(vCreateTbBatchReq.pArray); if (vCreateTbBatchRsp.rspList) { - + int32_t contLen = tSerializeSVCreateTbBatchRsp(NULL, 0, &vCreateTbBatchRsp); + void *msg = rpcMallocCont(contLen); + tSerializeSVCreateTbBatchRsp(msg, contLen, &vCreateTbBatchRsp); + taosArrayDestroy(vCreateTbBatchRsp.rspList); + + *pRsp = calloc(1, sizeof(SRpcMsg)); + (*pRsp)->msgType = TDMT_VND_CREATE_TABLE_RSP; + (*pRsp)->pCont = msg; + (*pRsp)->contLen = contLen; + (*pRsp)->handle = pMsg->handle; + (*pRsp)->ahandle = pMsg->ahandle; } break; } diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 3c12809ba7..853f3b52b1 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -851,18 +851,11 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable return TSDB_CODE_SUCCESS; } -int32_t ctgGetTableTypeFromCache(SCatalog* pCtg, const SName* pTableName, int32_t *tbType, int32_t flag) { +int32_t ctgGetTableTypeFromCache(SCatalog* pCtg, const char* dbFName, const char *tableName, int32_t *tbType) { if (NULL == pCtg->dbCache) { - ctgWarn("empty db cache, tbName:%s", pTableName->tname); + ctgWarn("empty db cache, dbFName:%s, tbName:%s", dbFName, tableName); return TSDB_CODE_SUCCESS; } - - char dbFName[TSDB_DB_FNAME_LEN] = {0}; - if (CTG_FLAG_IS_INF_DB(flag)) { - strcpy(dbFName, pTableName->dbname); - } else { - tNameGetFullDbName(pTableName, dbFName); - } SCtgDBCache *dbCache = NULL; ctgAcquireDBCache(pCtg, dbFName, &dbCache); @@ -871,11 +864,11 @@ int32_t ctgGetTableTypeFromCache(SCatalog* pCtg, const SName* pTableName, int32_ } CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock); - STableMeta *pTableMeta = (STableMeta *)taosHashAcquire(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname)); + STableMeta *pTableMeta = (STableMeta *)taosHashAcquire(dbCache->tbCache.metaCache, tableName, strlen(tableName)); if (NULL == pTableMeta) { CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); - ctgWarn("tbl not in cache, dbFName:%s, tbName:%s", dbFName, pTableName->tname); + ctgWarn("tbl not in cache, dbFName:%s, tbName:%s", dbFName, tableName); ctgReleaseDBCache(pCtg, dbCache); return TSDB_CODE_SUCCESS; @@ -889,7 +882,7 @@ int32_t ctgGetTableTypeFromCache(SCatalog* pCtg, const SName* pTableName, int32_ ctgReleaseDBCache(pCtg, dbCache); - ctgDebug("Got tbtype from cache, dbFName:%s, tbName:%s, type:%d", dbFName, pTableName->tname, *tbType); + ctgDebug("Got tbtype from cache, dbFName:%s, tbName:%s, type:%d", dbFName, tableName, *tbType); return TSDB_CODE_SUCCESS; } @@ -2074,24 +2067,19 @@ int32_t ctgActRemoveStb(SCtgMetaAction *action) { return TSDB_CODE_SUCCESS; } - if (dbCache->dbId != msg->dbId) { + if (msg->dbId && (dbCache->dbId != msg->dbId)) { ctgDebug("dbId already modified, dbFName:%s, current:%"PRIx64", dbId:%"PRIx64", stb:%s, suid:%"PRIx64, msg->dbFName, dbCache->dbId, msg->dbId, msg->stbName, msg->suid); return TSDB_CODE_SUCCESS; } CTG_LOCK(CTG_WRITE, &dbCache->tbCache.stbLock); if (taosHashRemove(dbCache->tbCache.stbCache, &msg->suid, sizeof(msg->suid))) { - CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock); ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid); - return TSDB_CODE_SUCCESS; } CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock); if (taosHashRemove(dbCache->tbCache.metaCache, msg->stbName, strlen(msg->stbName))) { - CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); - CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock); ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); @@ -2543,6 +2531,47 @@ _return: CTG_API_LEAVE(code); } +int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) { + CTG_API_ENTER(); + + int32_t code = 0; + + if (NULL == pCtg || NULL == pTableName) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + if (NULL == pCtg->dbCache) { + CTG_API_LEAVE(TSDB_CODE_SUCCESS); + } + + STableMeta *tblMeta = NULL; + int32_t exist = 0; + uint64_t dbId = 0; + CTG_ERR_JRET(ctgGetTableMetaFromCache(pCtg, pTableName, &tblMeta, &exist, 0, &dbId)); + + if (0 == exist) { + ctgDebug("table already not in cache, db:%s, tblName:%s", pTableName->dbname, pTableName->tname); + goto _return; + } + + char dbFName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(pTableName, dbFName); + + if (TSDB_SUPER_TABLE == tblMeta->tableType) { + CTG_ERR_JRET(ctgPushRmStbMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, tblMeta->suid)); + } else { + CTG_ERR_JRET(ctgPushRmTblMsgInQueue(pCtg, dbFName, dbId, pTableName->tname)); + } + + +_return: + + tfree(tblMeta); + + CTG_API_LEAVE(code); +} + + int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid) { CTG_API_ENTER(); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 260d86030a..87079d9908 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -8114,7 +8114,11 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols); tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, (uint64_t)queryId, taskId); - + if (NULL == pDataReader) { + errInfo->code = terrno; + errInfo->tableName = pScanPhyNode->tableName; + return NULL; + } int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index d8ee04ef2b..5f07d301f3 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -223,7 +223,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough); int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask); int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask); int32_t schFetchFromRemote(SSchJob *pJob); -int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode, SQueryErrorInfo *errInfo); +int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode, SArray *errList); #ifdef __cplusplus diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 6d4d2b393e..2e8f8a44d4 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -729,8 +729,8 @@ int32_t schProcessOnDataFetched(SSchJob *job) { tsem_post(&job->rspSem); } -int32_t schPushToErrInfoList(SSchJob *pJob, SSchTask *pTask, SQueryErrorInfo *errInfo) { - if (NULL == errInfo || !SCH_IS_DATA_SRC_TASK(pTask) || !IS_CLIENT_RETRY_ERROR(errInfo->code)) { +int32_t schPushToErrInfoList(SSchJob *pJob, SSchTask *pTask, SArray *errList) { + if (NULL == errList || !SCH_IS_DATA_SRC_TASK(pTask)) { return TSDB_CODE_SUCCESS; } @@ -743,10 +743,20 @@ int32_t schPushToErrInfoList(SSchJob *pJob, SSchTask *pTask, SQueryErrorInfo *er SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } } - - if (NULL == taosArrayPush(pJob->errList, errInfo)) { - SCH_TASK_ELOG("taosArrayPush errInfo to list failed, errCode:%x", errInfo->code); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + + SQueryErrorInfo *errInfo = NULL; + int32_t errNum = taosArrayGetSize(errList); + for (int32_t i = 0; i < errNum; ++i) { + errInfo = taosArrayGet(errList, i); + + if (!NEED_CLIENT_HANDLE_ERROR(errInfo->code)) { + continue; + } + + if (NULL == taosArrayPush(pJob->errList, errInfo)) { + SCH_TASK_ELOG("taosArrayPush errInfo to list failed, errCode:%x", errInfo->code); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } } return TSDB_CODE_SUCCESS; @@ -754,12 +764,13 @@ int32_t schPushToErrInfoList(SSchJob *pJob, SSchTask *pTask, SQueryErrorInfo *er // Note: no more task error processing, handled in function internal -int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode, SQueryErrorInfo *errInfo) { +int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode, SArray *errList) { int8_t status = 0; if (schJobNeedToStop(pJob, &status)) { SCH_TASK_DLOG("task failed not processed cause of job status, job status:%d", status); - + + taosArrayDestroy(errList); SCH_RET(atomic_load_32(&pJob->errCode)); } @@ -784,7 +795,7 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode, SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED); - SCH_ERR_JRET(schPushToErrInfoList(pJob, pTask, errInfo)); + SCH_ERR_JRET(schPushToErrInfoList(pJob, pTask, errList)); if (SCH_IS_WAIT_ALL_JOB(pJob)) { SCH_LOCK(SCH_WRITE, &pTask->level->lock); @@ -795,11 +806,14 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode, atomic_store_32(&pJob->errCode, errCode); if (taskDone < pTask->level->taskNum) { - SCH_TASK_DLOG("not all tasks done, done:%d, all:%d", taskDone, pTask->level->taskNum); - SCH_ERR_RET(errCode); + SCH_TASK_DLOG("need to wait other tasks, doneNum:%d, allNum:%d", taskDone, pTask->level->taskNum); + taosArrayDestroy(errList); + SCH_RET(errCode); } } } else { + taosArrayDestroy(errList); + SCH_ERR_JRET(schHandleTaskRetry(pJob, pTask)); return TSDB_CODE_SUCCESS; @@ -807,6 +821,8 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode, _return: + taosArrayDestroy(errList); + SCH_RET(schProcessOnJobFailure(pJob, errCode)); } @@ -924,6 +940,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch int8_t status = 0; bool errInfoGot = false; SQueryErrorInfo errInfo = {0}; + SArray *errList = NULL; if (schJobNeedToStop(pJob, &status)) { SCH_TASK_ELOG("rsp not processed cause of job status, job status:%d", status); @@ -935,6 +952,32 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch switch (msgType) { case TDMT_VND_CREATE_TABLE_RSP: { + SVCreateTbBatchRsp batchRsp = {0}; + if (msg) { + tDeserializeSVCreateTbBatchRsp(msg, msgSize, &batchRsp); + if (batchRsp.rspList) { + int32_t num = taosArrayGetSize(batchRsp.rspList); + errList = taosArrayInit(num, sizeof(SQueryErrorInfo)); + if (NULL == errList) { + SCH_TASK_ELOG("taskArrayInit %d errInfo failed", num); + taosArrayDestroy(batchRsp.rspList); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + for (int32_t i = 0; i < num; ++i) { + SVCreateTbRsp *rsp = taosArrayGet(batchRsp.rspList, i); + + errInfo.code = rsp->code; + errInfo.tableName = rsp->tableName; + + taosArrayPush(errList, &errInfo); + } + + taosArrayDestroy(batchRsp.rspList); + errInfoGot = true; + } + } + SCH_ERR_JRET(rspCode); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); @@ -942,7 +985,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch } case TDMT_VND_SUBMIT_RSP: { #if 0 //TODO OPEN THIS - SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg; + SSubmitRsp *rsp = (SSubmitRsp *)msg; if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) { SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); @@ -969,6 +1012,14 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch if (rsp.code) { errInfo.code = rsp.code; errInfo.tableName = rsp.tableName; + + errList = taosArrayInit(1, sizeof(SQueryErrorInfo)); + if (NULL == errList) { + SCH_TASK_ELOG("taskArrayInit %d errInfo failed", 1); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + taosArrayPush(errList, &errInfo); errInfoGot = true; } @@ -1039,7 +1090,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch _return: - SCH_RET(schProcessOnTaskFailure(pJob, pTask, code, errInfoGot ? &errInfo : NULL)); + SCH_RET(schProcessOnTaskFailure(pJob, pTask, code, errInfoGot ? errList : NULL)); } -- GitLab