From 43d79129873b3b050d076a44ccb69eb3aa6d6ded Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 15 Mar 2022 15:34:17 +0800 Subject: [PATCH] feature/scheduler --- include/common/tmsg.h | 9 +- include/libs/executor/executor.h | 2 +- include/libs/qcom/query.h | 8 +- include/libs/scheduler/scheduler.h | 1 - source/client/inc/clientInt.h | 3 +- source/client/src/clientImpl.c | 138 +++++----------- source/common/src/tmsg.c | 16 -- source/dnode/mgmt/impl/src/dndVnodes.c | 19 --- source/dnode/vnode/inc/meta.h | 1 + source/dnode/vnode/src/meta/metaBDBImpl.c | 12 ++ source/dnode/vnode/src/vnd/vnodeInt.c | 2 +- source/dnode/vnode/src/vnd/vnodeWrite.c | 12 +- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executor.c | 2 +- source/libs/executor/src/executorMain.c | 4 +- source/libs/executor/src/executorimpl.c | 45 +----- source/libs/executor/test/executorTests.cpp | 2 +- source/libs/parser/inc/parInsertData.h | 1 - source/libs/parser/src/parInsert.c | 4 +- source/libs/parser/src/parTranslater.c | 6 +- source/libs/planner/src/planPhysiCreater.c | 7 +- source/libs/qworker/inc/qworkerMsg.h | 2 +- source/libs/qworker/src/qworker.c | 7 +- source/libs/qworker/src/qworkerMsg.c | 5 +- source/libs/qworker/test/qworkerTests.cpp | 2 +- source/libs/scheduler/inc/schedulerInt.h | 2 +- source/libs/scheduler/src/schFlowCtrl.c | 2 +- source/libs/scheduler/src/scheduler.c | 170 +++++--------------- 28 files changed, 124 insertions(+), 362 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a78ba98e83..cf001428d9 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -190,20 +190,14 @@ typedef struct SEp { } SEp; typedef struct { - char dbFName[TSDB_DB_FNAME_LEN]; int32_t contLen; int32_t vgId; } SMsgHead; -typedef struct { - char dbFName[TSDB_DB_FNAME_LEN]; -} SRspHead; - // Submit message for one table typedef struct SSubmitBlk { int64_t uid; // table unique id int32_t tid; // table id - char tableName[TSDB_TABLE_NAME_LEN]; int32_t padding; // TODO just for padding here int32_t sversion; // data schema version int32_t dataLen; // data part length, not including the SSubmitBlk head @@ -466,7 +460,6 @@ typedef struct { typedef struct { int32_t code; - SName tableName; } SQueryTableRsp; int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp); @@ -1356,6 +1349,7 @@ typedef struct { typedef struct SVCreateTbReq { int64_t ver; // use a general definition + char* dbFName; char* name; uint32_t ttl; uint32_t keep; @@ -1381,7 +1375,6 @@ typedef struct SVCreateTbReq { typedef struct { int32_t code; - SName tableName; int tmp; // TODO: to avoid compile error } SVCreateTbRsp, SVUpdateTbRsp; diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index e1729835de..a31f660852 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -69,7 +69,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA * @param qId * @return */ -int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, SQueryErrorInfo *errInfo); +int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle); /** * The main task execution function, including query on both table and multiple tables, diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index fe506909d9..a7b682f49d 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -134,11 +134,6 @@ typedef struct SQueryNodeStat { int32_t tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT } SQueryNodeStat; -typedef struct SQueryErrorInfo { - int32_t code; - SName tableName; -} SQueryErrorInfo; - int32_t initTaskQueue(); int32_t cleanupTaskQueue(); @@ -181,7 +176,6 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE #define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE -#define ONLY_RSP_HEAD_ERROR(_code) ((_code) == TSDB_CODE_VND_INVALID_VGROUP_ID) #define NEED_CLIENT_RM_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_INVALID_TABLE_ID || (_code) == TSDB_CODE_VND_TB_NOT_EXIST) #define NEED_CLIENT_REFRESH_VG_ERROR(_code) ((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID) #define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_TABLE_RECREATED) @@ -189,6 +183,8 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define NEED_SCHEDULER_RETRY_ERROR(_code) ((_code) == TSDB_CODE_RPC_REDIRECT) +#define REQUEST_MAX_RETRY_NUM 3 + #define qFatal(...) \ do { \ if (qDebugFlag & DEBUG_FATAL) { \ diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 2d4cbd4ac0..16a6ae32cf 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -53,7 +53,6 @@ typedef struct SQueryProfileSummary { typedef struct SQueryResult { int32_t code; - SArray *errList; // SArray uint64_t numOfRows; int32_t msgSize; char *msg; diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index bf2a530d61..ca07f75de0 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -183,7 +183,8 @@ typedef struct SRequestObj { char* msgBuf; void* pInfo; // sql parse info, generated by parser module int32_t code; - SArray* errList; // SArray + SArray* dbList; + SArray* tableList; SQueryExecMetric metric; SRequestSendRecvBody body; } SRequestObj; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 193c8fdf72..21ae132990 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -231,7 +231,6 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList schedulerFreeJob(pRequest->body.queryJob); } - pRequest->errList = res.errList; pRequest->code = code; terrno = code; return pRequest->code; @@ -245,7 +244,6 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList } } - pRequest->errList = res.errList; pRequest->code = res.code; terrno = res.code; return pRequest->code; @@ -277,125 +275,61 @@ _return: return pRequest; } -int32_t clientProcessErrorList(SArray **pList) { - SArray *errList = *pList; - int32_t errNum = (int32_t)taosArrayGetSize(errList); +int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) { + SCatalog *pCatalog = NULL; + int32_t code = 0; + int32_t dbNum = taosArrayGetSize(pRequest->dbList); + int32_t tblNum = taosArrayGetSize(pRequest->tableList); + + if (dbNum <= 0 && tblNum <= 0) { + return TSDB_CODE_QRY_APP_ERROR; + } - for (int32_t i = 0; i < errNum; ++i) { - SQueryErrorInfo *errInfo = taosArrayGet(errList, i); - if (NEED_CLIENT_REFRESH_VG_ERROR(errInfo->code)) { - if (i == (errNum - 1)) { - break; - } - - // TODO REMOVE SAME DB ERROR - } else if (NEED_CLIENT_REFRESH_TBLMETA_ERROR(errInfo->code) || NEED_CLIENT_RM_TBLMETA_ERROR(errInfo->code)) { - continue; - } else { - taosArrayRemove(errList, i); - --i; - --errNum; + code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); + + for (int32_t i = 0; i < dbNum; ++i) { + char *dbFName = taosArrayGet(pRequest->dbList, i); + + code = catalogRefreshDBVgInfo(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, dbFName); + if (code != TSDB_CODE_SUCCESS) { + return code; } } - if (0 == errNum) { - taosArrayDestroy(*pList); - *pList = NULL; + for (int32_t i = 0; i < tblNum; ++i) { + SName *tableName = taosArrayGet(pRequest->tableList, i); + + code = catalogRefreshTableMeta(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, tableName, -1); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } - return TSDB_CODE_SUCCESS; + return code; } SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { SRequestObj* pRequest = NULL; + int32_t retryNum = 0; int32_t code = 0; - int32_t needRetryNum = 0; - int32_t needRetryFailNum = 0; - while (true) { + while (retryNum++ < REQUEST_MAX_RETRY_NUM) { pRequest = execQueryImpl(pTscObj, sql, sqlLen); - if (TSDB_CODE_SUCCESS == pRequest->code || NULL == pRequest->errList) { + if (TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) { break; } - code = clientProcessErrorList(&pRequest->errList); - if (code != TSDB_CODE_SUCCESS || NULL == pRequest->errList) { + code = refreshMeta(pTscObj, pRequest); + if (code) { + pRequest->code = code; break; } - - int32_t errNum = (int32_t)taosArrayGetSize(pRequest->errList); - for (int32_t i = 0; i < errNum; ++i) { - SQueryErrorInfo *errInfo = taosArrayGet(pRequest->errList, i); - int32_t tcode = 0; - - if (NEED_CLIENT_REFRESH_VG_ERROR(errInfo->code)) { - ++needRetryNum; - - SCatalog *pCatalog = NULL; - tcode = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); - if (tcode != TSDB_CODE_SUCCESS) { - ++needRetryFailNum; - code = tcode; - continue; - } - - SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); - - char dbFName[TSDB_DB_FNAME_LEN]; - tNameGetFullDbName(&errInfo->tableName, dbFName); - - tcode = catalogRefreshDBVgInfo(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, dbFName); - if (tcode != TSDB_CODE_SUCCESS) { - ++needRetryFailNum; - code = tcode; - continue; - } - } else if (NEED_CLIENT_RM_TBLMETA_ERROR(errInfo->code)) { - SCatalog *pCatalog = NULL; - tcode = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); - if (tcode != TSDB_CODE_SUCCESS) { - code = tcode; - continue; - } - - catalogRemoveTableMeta(pCatalog, &errInfo->tableName); - } else if (NEED_CLIENT_REFRESH_TBLMETA_ERROR(errInfo->code)) { - ++needRetryNum; - - SCatalog *pCatalog = NULL; - tcode = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); - if (tcode != TSDB_CODE_SUCCESS) { - ++needRetryFailNum; - code = tcode; - continue; - } - - SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); - - char dbFName[TSDB_DB_FNAME_LEN]; - tNameGetFullDbName(&errInfo->tableName, dbFName); - - tcode = catalogRefreshTableMeta(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, &errInfo->tableName, -1); - if (tcode != TSDB_CODE_SUCCESS) { - ++needRetryFailNum; - code = tcode; - continue; - } - } - } - - if ((needRetryNum && (0 == needRetryFailNum) && (TDMT_VND_SUBMIT != pRequest->type && TDMT_VND_CREATE_TABLE != pRequest->type)) - || (needRetryNum && (needRetryNum > needRetryFailNum) && (TDMT_VND_SUBMIT == pRequest->type && TDMT_VND_CREATE_TABLE == pRequest->type))) { - destroyRequest(pRequest); - continue; - } - - break; - } - - if (code) { - pRequest->code = code; } return pRequest; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 045ce13a1b..9d7082a02c 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2631,10 +2631,6 @@ int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp if (tStartEncode(&encoder) < 0) return -1; if (tEncodeI32(&encoder, pRsp->code) < 0) return -1; - if (tEncodeI8(&encoder, pRsp->tableName.type) < 0) return -1; - if (tEncodeI32(&encoder, pRsp->tableName.acctId) < 0) return -1; - if (tEncodeCStr(&encoder, pRsp->tableName.dbname) < 0) return -1; - if (tEncodeCStr(&encoder, pRsp->tableName.tname) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -2648,10 +2644,6 @@ int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pR if (tStartDecode(&decoder) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->code) < 0) return -1; - if (tDecodeI8(&decoder, &pRsp->tableName.type) < 0) return -1; - if (tDecodeI32(&decoder, &pRsp->tableName.acctId) < 0) return -1; - if (tDecodeCStrTo(&decoder, pRsp->tableName.dbname) < 0) return -1; - if (tDecodeCStrTo(&decoder, pRsp->tableName.tname) < 0) return -1; tEndDecode(&decoder); tCoderClear(&decoder); @@ -2669,10 +2661,6 @@ int32_t tSerializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatchR for (int32_t i = 0; i < num; ++i) { SVCreateTbRsp *rsp = taosArrayGet(pRsp->rspList, i); if (tEncodeI32(&encoder, rsp->code) < 0) return -1; - if (tEncodeU8(&encoder, rsp->tableName.type) < 0) return -1; - if (tEncodeI32(&encoder, rsp->tableName.acctId) < 0) return -1; - if (tEncodeCStr(&encoder, rsp->tableName.dbname) < 0) return -1; - if (tEncodeCStr(&encoder, rsp->tableName.tname) < 0) return -1; } } else { if (tEncodeI32(&encoder, 0) < 0) return -1; @@ -2697,10 +2685,6 @@ int32_t tDeserializeSVCreateTbBatchRsp(void *buf, int32_t bufLen, SVCreateTbBatc for (int32_t i = 0; i < num; ++i) { SVCreateTbRsp rsp = {0}; if (tDecodeI32(&decoder, &rsp.code) < 0) return -1; - if (tDecodeU8(&decoder, &rsp.tableName.type) < 0) return -1; - if (tDecodeI32(&decoder, &rsp.tableName.acctId) < 0) return -1; - if (tDecodeCStrTo(&decoder, rsp.tableName.dbname) < 0) return -1; - if (tDecodeCStrTo(&decoder, rsp.tableName.tname) < 0) return -1; if (NULL == taosArrayPush(pRsp->rspList, &rsp)) return -1; } } else { diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 713ab1282d..d311e1e417 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -805,24 +805,6 @@ static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg, return code; } -static void dndGenerateResponseHead(SRpcMsg *pMsg, void **pRspHead, int *contLen) { - if (TDMT_VND_SUBMIT != pMsg->msgType && TDMT_VND_QUERY != pMsg->msgType - && TDMT_VND_CREATE_TABLE != pMsg->msgType && TDMT_VND_TABLE_META != pMsg->msgType) { - return; - } - - *pRspHead = rpcMallocCont(sizeof(SRspHead)); - if (NULL == *pRspHead) { - return; - } - - SMsgHead *pHead = pMsg->pCont; - - strcpy(((SRspHead *)(*pRspHead))->dbFName, pHead->dbFName); - - *contLen = sizeof(SRspHead); -} - static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { SMsgHead *pHead = pMsg->pCont; pHead->contLen = htonl(pHead->contLen); @@ -833,7 +815,6 @@ static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { dError("vgId:%d, failed to acquire vnode while process req", pHead->vgId); if (pMsg->msgType & 1u) { SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID}; - dndGenerateResponseHead(pMsg, &rsp.pCont, &rsp.contLen); rpcSendResponse(&rsp); } rpcFreeCont(pMsg->pCont); diff --git a/source/dnode/vnode/inc/meta.h b/source/dnode/vnode/inc/meta.h index 05749884d3..a48f437c97 100644 --- a/source/dnode/vnode/inc/meta.h +++ b/source/dnode/vnode/inc/meta.h @@ -61,6 +61,7 @@ STSchema * metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver); STSma * metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid); STSmaWrapper * metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid); SArray * metaGetSmaTbUids(SMeta *pMeta, bool isDup); +int metaGetTbNum(SMeta *pMeta); SMTbCursor *metaOpenTbCursor(SMeta *pMeta); void metaCloseTbCursor(SMTbCursor *pTbCur); diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index d9af526c2a..a9d87fa406 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -704,6 +704,18 @@ SMTbCursor *metaOpenTbCursor(SMeta *pMeta) { return pTbCur; } +int metaGetTbNum(SMeta *pMeta) { + SMetaDB *pDB = pMeta->pDB; + + DB_BTREE_STAT *sp1; + pDB->pTbDB->stat(pDB->pNtbIdx, NULL, &sp1, 0); + + DB_BTREE_STAT *sp2; + pDB->pTbDB->stat(pDB->pCtbIdx, NULL, &sp2, 0); + + return sp1->bt_nkeys + sp2->bt_nkeys; +} + void metaCloseTbCursor(SMTbCursor *pTbCur) { if (pTbCur) { if (pTbCur->pCur) { diff --git a/source/dnode/vnode/src/vnd/vnodeInt.c b/source/dnode/vnode/src/vnd/vnodeInt.c index 7d0b594e95..62c9738989 100644 --- a/source/dnode/vnode/src/vnd/vnodeInt.c +++ b/source/dnode/vnode/src/vnd/vnodeInt.c @@ -27,7 +27,7 @@ int32_t vnodeSync(SVnode *pVnode) { return 0; } int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { pLoad->vgId = pVnode->vgId; pLoad->role = TAOS_SYNC_STATE_LEADER; - pLoad->numOfTables = 500; + pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); pLoad->numOfTimeSeries = 400; pLoad->totalStorage = 300; pLoad->compStorage = 200; diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 96e0bf11b3..f82105aba0 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -85,22 +85,12 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { char tableFName[TSDB_TABLE_FNAME_LEN]; SMsgHead *pHead = (SMsgHead *)pMsg->pCont; - sprintf(tableFName, "%s.%s", pHead->dbFName, pCreateTbReq->name); + sprintf(tableFName, "%s.%s", pCreateTbReq->dbFName, pCreateTbReq->name); int32_t code = vnodeValidateTableHash(&pVnode->config, tableFName); if (code) { SVCreateTbRsp rsp; rsp.code = code; - tNameFromString(&rsp.tableName, tableFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - - if (NULL == vCreateTbBatchRsp.rspList) { - vCreateTbBatchRsp.rspList = taosArrayInit(reqNum - i, sizeof(SVCreateTbRsp)); - if (NULL == vCreateTbBatchRsp.rspList) { - vError("vgId:%d, failed to init array: %d", pVnode->vgId, reqNum - i); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - } taosArrayPush(vCreateTbBatchRsp.rspList, &rsp); } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 3f31a21756..f7da27499e 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -711,7 +711,7 @@ int32_t getMaximumIdleDurationSec(); void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); -int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, SQueryErrorInfo *errInfo); +int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index ce50298add..a8602b7c77 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -84,7 +84,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { } qTaskInfo_t pTaskInfo = NULL; - code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL, NULL); + code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL); if (code != TSDB_CODE_SUCCESS) { // TODO: destroy SSubplan & pTaskInfo terrno = code; diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 48873dc552..7e55a4b3e1 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -51,11 +51,11 @@ static void freeqinfoFn(void *qhandle) { qDestroyTask(*handle); } -int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, SQueryErrorInfo *errInfo) { +int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) { assert(readHandle != NULL && pSubplan != NULL); SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; - int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, errInfo); + int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a577f43655..1faa1c25a4 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -8126,7 +8126,7 @@ static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo); static SArray* extractScanColumnId(SNodeList* pNodeList); -SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo, SQueryErrorInfo *errInfo) { +SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_PROJECT) { // ignore the project node pPhyNode = nodesListGetNode(pPhyNode->pChildren, 0); } @@ -8135,39 +8135,9 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; - if (TSDB_SUPER_TABLE != pScanPhyNode->tableType) { - char tableFName[TSDB_TABLE_FNAME_LEN]; - tNameExtractFullName(&pScanPhyNode->tableName, tableFName); - - int32_t code = vnodeValidateTableHash(pHandle->config, tableFName); - if (code) { - errInfo->code = code; - errInfo->tableName = pScanPhyNode->tableName; - return NULL; - } - } - - STbCfg* pTbCfg = metaGetTbInfoByUid(pHandle->meta, pScanPhyNode->uid); - if (pTbCfg == NULL) { - tb_uid_t uid = 0; - pTbCfg = metaGetTbInfoByName(pHandle->meta, pScanPhyNode->tableName.tname, &uid); - if (pTbCfg) { - errInfo->code = TSDB_CODE_TDB_TABLE_RECREATED; - errInfo->tableName = pScanPhyNode->tableName; - return NULL; - } - - errInfo->code = TSDB_CODE_TDB_INVALID_TABLE_ID; - errInfo->tableName = pScanPhyNode->tableName; - return NULL; - } - - size_t numOfCols = LIST_LENGTH(pScanPhyNode->pScanCols); tsdbReaderT pDataReader = doCreateDataReader((STableScanPhysiNode*)pPhyNode, pHandle, (uint64_t)queryId, taskId); if (NULL == pDataReader) { - errInfo->code = terrno; - errInfo->tableName = pScanPhyNode->tableName; return NULL; } int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); @@ -8208,10 +8178,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa for (int32_t i = 0; i < size; ++i) { SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); - SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo, errInfo); - if (errInfo->code) { - return NULL; - } + SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); SArray* pExprInfo = createExprInfo((SAggPhysiNode*)pPhyNode); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); @@ -8330,7 +8297,7 @@ tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* return NULL; } -int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, SQueryErrorInfo *errInfo) { +int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId) { uint64_t queryId = pPlan->id.queryId; int32_t code = TSDB_CODE_SUCCESS; @@ -8341,9 +8308,9 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead } STableGroupInfo group = {0}; - (*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group, errInfo); - if (errInfo->code) { - code = errInfo->code; + (*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &group); + if (NULL == (*pTaskInfo)->pRoot) { + code = terrno; goto _complete; } diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index 127e4f25e2..b3044cd2a4 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -1230,4 +1230,4 @@ TEST(testCase, time_interval_Operator_Test) { } #endif -#pragma GCC diagnostic pop +#pragma GCC diagnosti \ No newline at end of file diff --git a/source/libs/parser/inc/parInsertData.h b/source/libs/parser/inc/parInsertData.h index 53efd15f10..67ff2d1ae0 100644 --- a/source/libs/parser/inc/parInsertData.h +++ b/source/libs/parser/inc/parInsertData.h @@ -121,7 +121,6 @@ static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks* pBlocks->tid = dataBuf->pTableMeta->suid; pBlocks->uid = dataBuf->pTableMeta->uid; pBlocks->sversion = dataBuf->pTableMeta->sversion; - strcpy(pBlocks->tableName, dataBuf->tableName); if (pBlocks->numOfRows + numOfRows >= INT16_MAX) { return TSDB_CODE_TSC_INVALID_OPERATION; diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 57b8acfbda..b01b374696 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1049,6 +1049,4 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { if (TSDB_CODE_SUCCESS == code) { code = parseInsertBody(&context); } - destroyInsertParseContext(&context); - return code; -} + dest \ No newline at end of file diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 47588db7d7..8c44eafbd1 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1743,8 +1743,4 @@ int32_t doTranslate(SParseContext* pParseCxt, SQuery* pQuery) { code = translateQuery(&cxt, pQuery->pRoot); } if (TSDB_CODE_SUCCESS == code) { - code = setQuery(&cxt, pQuery); - } - destroyTranslateContext(&cxt); - return code; -} + code = setQu \ No newline at end of file diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 8193859c9b..0a0e7b21eb 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -263,11 +263,12 @@ static SPhysiNode* createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* p pTableScan->scanRange = pScanLogicNode->scanRange; vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode); + pSubplan->execNodeStat.tableNum = pScanLogicNode->pVgroupList->vgroups[0].numOfTable; tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName); return (SPhysiNode*)pTableScan; } -static SPhysiNode* createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) { +static SPhysiNode* createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode) { SSystemTableScanPhysiNode* pScan = (SSystemTableScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN); CHECK_ALLOC(pScan, NULL); CHECK_CODE(initScanPhysiNode(pCxt, pScanLogicNode, (SScanPhysiNode*)pScan), (SPhysiNode*)pScan); @@ -277,6 +278,7 @@ static SPhysiNode* createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SScan taosArrayPush(pCxt->pExecNodeList, &addr); } pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet; + tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName); return (SPhysiNode*)pScan; } @@ -287,7 +289,7 @@ static SPhysiNode* createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubpl case SCAN_TYPE_TABLE: return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode); case SCAN_TYPE_SYSTEM_TABLE: - return createSystemTableScanPhysiNode(pCxt, pScanLogicNode); + return createSystemTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode); case SCAN_TYPE_STREAM: break; default: @@ -840,4 +842,3 @@ int32_t createPhysiPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SQueryPlan** nodesDestroyNode(pLogicPlan); return code; } - \ No newline at end of file diff --git a/source/libs/qworker/inc/qworkerMsg.h b/source/libs/qworker/inc/qworkerMsg.h index f8d8ce4563..ecb5dbd654 100644 --- a/source/libs/qworker/inc/qworkerMsg.h +++ b/source/libs/qworker/inc/qworkerMsg.h @@ -36,7 +36,7 @@ int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete); int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, void *connection); int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code); -int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code, SQueryErrorInfo *errInfo); +int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code); void qwFreeFetchRsp(void *msg); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 901acca2ac..42890ab38a 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -998,7 +998,6 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { qTaskInfo_t pTaskInfo = NULL; DataSinkHandle sinkHandle = NULL; SQWTaskCtx *ctx = NULL; - SQueryErrorInfo errInfo = {0}; QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, &output)); @@ -1020,7 +1019,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { QW_ERR_JRET(code); } - code = qCreateExecTask(qwMsg->node, 0, tId, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle, &errInfo); + code = qCreateExecTask(qwMsg->node, 0, tId, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle); if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%s", tstrerror(code)); QW_ERR_JRET(code); @@ -1033,7 +1032,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) { //TODO OPTIMIZE EMTYP RESULT QUERY RSP TO AVOID FURTHER FETCH - QW_ERR_JRET(qwBuildAndSendQueryRsp(qwMsg->connection, code, NULL)); + QW_ERR_JRET(qwBuildAndSendQueryRsp(qwMsg->connection, code)); QW_TASK_DLOG("query msg rsped, code:%d", code); queryRsped = true; @@ -1052,7 +1051,7 @@ _return: } if (!queryRsped) { - qwBuildAndSendQueryRsp(qwMsg->connection, rspCode, &errInfo); + qwBuildAndSendQueryRsp(qwMsg->connection, rspCode); QW_TASK_DLOG("query msg rsped, code:%x", rspCode); } diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index ce39c710c8..c67b3ef4a9 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -44,12 +44,9 @@ void qwFreeFetchRsp(void *msg) { } } -int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code, SQueryErrorInfo *errInfo) { +int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code) { SRpcMsg *pMsg = (SRpcMsg *)connection; SQueryTableRsp rsp = {.code = code}; - if (errInfo && errInfo->code) { - rsp.tableName = errInfo->tableName; - } int32_t contLen = tSerializeSQueryTableRsp(NULL, 0, &rsp); void *msg = rpcMallocCont(contLen); diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 1d378ef4f9..0beccf1313 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -1363,4 +1363,4 @@ int main(int argc, char** argv) { return RUN_ALL_TESTS(); } -#pragma GCC diagnostic pop +# \ No newline at end of file diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 5f07d301f3..2ccc18c176 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -223,7 +223,7 @@ int32_t schCheckIncTaskFlowQuota(SSchJob *pJob, SSchTask *pTask, bool *enough); int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask); int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask); int32_t schFetchFromRemote(SSchJob *pJob); -int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode, SArray *errList); +int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode); #ifdef __cplusplus diff --git a/source/libs/scheduler/src/schFlowCtrl.c b/source/libs/scheduler/src/schFlowCtrl.c index 4a2173561f..9fba6523b6 100644 --- a/source/libs/scheduler/src/schFlowCtrl.c +++ b/source/libs/scheduler/src/schFlowCtrl.c @@ -259,7 +259,7 @@ _return: SCH_UNLOCK(SCH_WRITE, &ctrl->lock); if (code) { - code = schProcessOnTaskFailure(pJob, pTask, code, NULL); + code = schProcessOnTaskFailure(pJob, pTask, code); } SCH_RET(code); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 6c4c822763..e292513f8e 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -671,14 +671,42 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchHbTrans *trans) { return TSDB_CODE_SUCCESS; } -int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) { - // if already FAILED, no more processing - SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, status)); +void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) { + if (TSDB_CODE_SUCCESS == errCode) { + return; + } + + int32_t origCode = atomic_load_32(&pJob->errCode); + if (TSDB_CODE_SUCCESS == origCode) { + if (origCode == atomic_val_compare_exchange_32(&pJob->errCode, origCode, errCode)) { + goto _return; + } + + origCode = atomic_load_32(&pJob->errCode); + } + + if (NEED_CLIENT_HANDLE_ERROR(origCode)) { + return; + } - if (errCode) { + if (NEED_CLIENT_HANDLE_ERROR(errCode)) { atomic_store_32(&pJob->errCode, errCode); + goto _return; } + return; + +_return: + + SCH_JOB_DLOG("job errCode updated to %x - %s", errCode, tstrerror(errCode)); +} + +int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) { + // if already FAILED, no more processing + SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, status)); + + schUpdateJobErrCode(pJob, errCode); + if (atomic_load_8(&pJob->userFetch) || pJob->attr.syncSchedule) { tsem_post(&pJob->rspSem); } @@ -729,48 +757,12 @@ int32_t schProcessOnDataFetched(SSchJob *job) { tsem_post(&job->rspSem); } -int32_t schPushToErrInfoList(SSchJob *pJob, SSchTask *pTask, SArray *errList) { - if (NULL == errList || !SCH_IS_DATA_SRC_TASK(pTask)) { - return TSDB_CODE_SUCCESS; - } - - if (NULL == pJob->errList) { - SSchLevel *level = taosArrayGetLast(pJob->levels); - - pJob->errList = taosArrayInit(level->taskNum, sizeof(SQueryErrorInfo)); - if (NULL == pJob->errList) { - SCH_TASK_ELOG("taosArrayInit %d errInfofailed", pJob->taskNum); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - } - - SQueryErrorInfo *errInfo = NULL; - int32_t errNum = taosArrayGetSize(errList); - for (int32_t i = 0; i < errNum; ++i) { - errInfo = taosArrayGet(errList, i); - - if (!NEED_CLIENT_HANDLE_ERROR(errInfo->code)) { - continue; - } - - if (NULL == taosArrayPush(pJob->errList, errInfo)) { - SCH_TASK_ELOG("taosArrayPush errInfo to list failed, errCode:%x", errInfo->code); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - } - - return TSDB_CODE_SUCCESS; -} - - // Note: no more task error processing, handled in function internal -int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode, SArray *errList) { +int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) { int8_t status = 0; if (schJobNeedToStop(pJob, &status)) { SCH_TASK_DLOG("task failed not processed cause of job status, job status:%d", status); - - taosArrayDestroy(errList); SCH_RET(atomic_load_32(&pJob->errCode)); } @@ -794,8 +786,6 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode, } SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_FAILED); - - SCH_ERR_JRET(schPushToErrInfoList(pJob, pTask, errList)); if (SCH_IS_WAIT_ALL_JOB(pJob)) { SCH_LOCK(SCH_WRITE, &pTask->level->lock); @@ -803,17 +793,14 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode, taskDone = pTask->level->taskSucceed + pTask->level->taskFailed; SCH_UNLOCK(SCH_WRITE, &pTask->level->lock); - atomic_store_32(&pJob->errCode, errCode); + schUpdateJobErrCode(pJob, errCode); if (taskDone < pTask->level->taskNum) { SCH_TASK_DLOG("need to wait other tasks, doneNum:%d, allNum:%d", taskDone, pTask->level->taskNum); - taosArrayDestroy(errList); SCH_RET(errCode); } } } else { - taosArrayDestroy(errList); - SCH_ERR_JRET(schHandleTaskRetry(pJob, pTask)); return TSDB_CODE_SUCCESS; @@ -821,8 +808,6 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode, _return: - taosArrayDestroy(errList); - SCH_RET(schProcessOnJobFailure(pJob, errCode)); } @@ -930,30 +915,7 @@ _return: atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0); - SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code, NULL)); -} - -int32_t schRspHeadToErrList(SSchJob *pJob, SSchTask *pTask, int32_t errCode, SRspHead *head, SArray **errList) { - SQueryErrorInfo errInfo = {0}; - errInfo.code = errCode; - if (tNameFromString(&errInfo.tableName, head->dbFName, T_NAME_ACCT | T_NAME_DB)) { - SCH_TASK_ELOG("invalid rsp head, dbFName:%s", head->dbFName); - SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); - } - - *errList = taosArrayInit(1, sizeof(SQueryErrorInfo)); - if (NULL == *errList) { - SCH_TASK_ELOG("taskArrayInit %d errInfo failed", 1); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - if (NULL == taosArrayPush(*errList, &errInfo)) { - SCH_TASK_ELOG("taosArrayPush err to errList failed, dbFName:%s", head->dbFName); - taosArrayDestroy(*errList); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - return TSDB_CODE_SUCCESS; + SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code)); } @@ -961,9 +923,6 @@ int32_t schRspHeadToErrList(SSchJob *pJob, SSchTask *pTask, int32_t errCode, SRs int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { int32_t code = 0; int8_t status = 0; - bool errInfoGot = false; - SQueryErrorInfo errInfo = {0}; - SArray *errList = NULL; if (schJobNeedToStop(pJob, &status)) { SCH_TASK_ELOG("rsp not processed cause of job status, job status:%d", status); @@ -977,33 +936,18 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch case TDMT_VND_CREATE_TABLE_RSP: { SVCreateTbBatchRsp batchRsp = {0}; if (msg) { - if (ONLY_RSP_HEAD_ERROR(rspCode)) { - SCH_ERR_JRET(schRspHeadToErrList(pJob, pTask, rspCode, (SRspHead *)msg, &errList)); - errInfoGot = true; - SCH_ERR_JRET(rspCode); - } - tDeserializeSVCreateTbBatchRsp(msg, msgSize, &batchRsp); if (batchRsp.rspList) { int32_t num = taosArrayGetSize(batchRsp.rspList); - errList = taosArrayInit(num, sizeof(SQueryErrorInfo)); - if (NULL == errList) { - SCH_TASK_ELOG("taskArrayInit %d errInfo failed", num); - taosArrayDestroy(batchRsp.rspList); - SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - for (int32_t i = 0; i < num; ++i) { SVCreateTbRsp *rsp = taosArrayGet(batchRsp.rspList, i); - - errInfo.code = rsp->code; - errInfo.tableName = rsp->tableName; - - taosArrayPush(errList, &errInfo); + if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) { + taosArrayDestroy(batchRsp.rspList); + SCH_ERR_JRET(rsp->code); + } } - + taosArrayDestroy(batchRsp.rspList); - errInfoGot = true; } } @@ -1014,12 +958,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch } case TDMT_VND_SUBMIT_RSP: { if (msg) { - if (ONLY_RSP_HEAD_ERROR(rspCode)) { - SCH_ERR_JRET(schRspHeadToErrList(pJob, pTask, rspCode, (SRspHead *)msg, &errList)); - errInfoGot = true; - SCH_ERR_JRET(rspCode); - } - SSubmitRsp *rsp = (SSubmitRsp *)msg; SCH_ERR_JRET(rsp->code); @@ -1036,27 +974,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch case TDMT_VND_QUERY_RSP: { SQueryTableRsp rsp = {0}; if (msg) { - if (ONLY_RSP_HEAD_ERROR(rspCode)) { - SCH_ERR_JRET(schRspHeadToErrList(pJob, pTask, rspCode, (SRspHead *)msg, &errList)); - errInfoGot = true; - SCH_ERR_JRET(rspCode); - } - tDeserializeSQueryTableRsp(msg, msgSize, &rsp); - if (rsp.code) { - errInfo.code = rsp.code; - errInfo.tableName = rsp.tableName; - - errList = taosArrayInit(1, sizeof(SQueryErrorInfo)); - if (NULL == errList) { - SCH_TASK_ELOG("taskArrayInit %d errInfo failed", 1); - SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - taosArrayPush(errList, &errInfo); - errInfoGot = true; - } - SCH_ERR_JRET(rsp.code); } @@ -1124,7 +1042,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch _return: - SCH_RET(schProcessOnTaskFailure(pJob, pTask, code, errInfoGot ? errList : NULL)); + SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } @@ -1360,7 +1278,6 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, SSubQueryMsg *pMsg = msg; pMsg->header.vgId = htonl(addr->nodeId); - strcpy(pMsg->header.dbFName, pTask->plan->dbFName); pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(pJob->queryId); pMsg->taskId = htobe64(pTask->taskId); @@ -1550,7 +1467,7 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) { _return: - SCH_RET(schProcessOnTaskFailure(pJob, pTask, code, NULL)); + SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) { @@ -1666,7 +1583,6 @@ void schFreeJobImpl(void *job) { taosArrayDestroy(pJob->levels); taosArrayDestroy(pJob->nodeList); - taosArrayDestroy(pJob->errList); tfree(pJob->resData); @@ -1806,8 +1722,6 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan* pDag, in pRes->code = atomic_load_32(&job->errCode); pRes->numOfRows = job->resNumOfRows; - pRes->errList = job->errList; - job->errList = NULL; schReleaseJob(*pJob); -- GitLab