diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index aac9c25f779e864a86c300be10ce5756f99ab676..0eb05ccbe970517642f93431404ad17b3916c5df 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -552,6 +552,9 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES) || 0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED)) { vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); + } else { + pSubplan->execNode.nodeId = MNODE_HANDLE; + pSubplan->execNode.epSet = pCxt->pPlanCxt->mgmtEpSet; } SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0}; taosArrayPush(pCxt->pExecNodeList, &node); diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index ce4b9eea19cc69839742e794d2e5755eeb146879..74b4dcf0765b36166801b99f0d88f0c0973b0501 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -55,8 +55,8 @@ typedef enum { } SCH_OP_TYPE; typedef enum { - SCH_EVENT_ENTER_API = 1, - SCH_EVENT_LEAVE_API, + SCH_EVENT_BEGIN_OP = 1, + SCH_EVENT_END_OP, SCH_EVENT_MSG, SCH_EVENT_DROP, } SCH_EVENT_TYPE; @@ -111,6 +111,12 @@ typedef struct SSchResInfo { void* userParam; } SSchResInfo; +typedef struct SSchOpEvent { + SCH_OP_TYPE type; + bool begin; + SSchedulerReq *pReq; +} SSchOpEvent; + typedef struct SSchEvent { SCH_EVENT_TYPE event; void* info; diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index e137b2b001d411b1fcc1ece411ab6a547a26009a..893a836529448eef1356a40c6c337c3e08572b83 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -25,88 +25,6 @@ FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { qDebug("sch acquire jobId:0 FORCE_INLINE int32_t schReleaseJob(int64_t refId) { qDebug("sch release jobId:0x%"PRIx64, refId); return taosReleaseRef(schMgmt.jobRef, refId); } -int32_t schInitJob(SSchJob **pSchJob, SSchedulerReq *pReq) { - int32_t code = 0; - int64_t refId = -1; - SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob)); - if (NULL == pJob) { - qError("QID:0x%" PRIx64 " calloc %d failed", pReq->pDag->queryId, (int32_t)sizeof(SSchJob)); - SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - pJob->attr.explainMode = pReq->pDag->explainInfo.mode; - pJob->conn = *pReq->pConn; - pJob->sql = pReq->sql; - pJob->pDag = pReq->pDag; - pJob->chkKillFp = pReq->chkKillFp; - pJob->chkKillParam = pReq->chkKillParam; - pJob->userRes.execFp = pReq->execFp; - pJob->userRes.userParam = pReq->execParam; - pJob->opStatus.op = SCH_OP_EXEC; - pJob->opStatus.syncReq = pReq->syncReq; - - if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) { - qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId); - } else { - pJob->nodeList = taosArrayDup(pReq->pNodeList); - } - - pJob->taskList = - taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); - if (NULL == pJob->taskList) { - SCH_JOB_ELOG("taosHashInit %d taskList failed", pReq->pDag->numOfSubplans); - SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - SCH_ERR_JRET(schValidateAndBuildJob(pReq->pDag, pJob)); - - if (SCH_IS_EXPLAIN_JOB(pJob)) { - SCH_ERR_JRET(qExecExplainBegin(pReq->pDag, &pJob->explainCtx, pReq->startTs)); - } - - pJob->execTasks = - taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); - if (NULL == pJob->execTasks) { - SCH_JOB_ELOG("taosHashInit %d execTasks failed", pReq->pDag->numOfSubplans); - SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - tsem_init(&pJob->rspSem, 0, 0); - - refId = taosAddRef(schMgmt.jobRef, pJob); - if (refId < 0) { - SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno)); - SCH_ERR_JRET(terrno); - } - - atomic_add_fetch_32(&schMgmt.jobNum, 1); - - if (NULL == schAcquireJob(refId)) { - SCH_JOB_ELOG("schAcquireJob job failed, refId:0x%" PRIx64, refId); - SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); - } - - pJob->refId = refId; - - SCH_JOB_DLOG("job refId:0x%" PRIx64" created", pJob->refId); - - *pSchJob = pJob; - - return TSDB_CODE_SUCCESS; - -_return: - - if (NULL == pJob) { - qDestroyQueryPlan(pReq->pDag); - } else if (refId < 0) { - schFreeJobImpl(pJob); - } else { - taosRemoveRef(schMgmt.jobRef, refId); - } - - SCH_RET(code); -} - void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) { if (TSDB_CODE_SUCCESS == errCode) { @@ -231,66 +149,6 @@ _return: SCH_RET(code); } - -void schEndOperation(SSchJob *pJob) { - int32_t op = atomic_load_32(&pJob->opStatus.op); - if (SCH_OP_NULL == op) { - SCH_JOB_DLOG("job already not in any operation, status:%s", jobTaskStatusStr(pJob->status)); - return; - } - - atomic_store_32(&pJob->opStatus.op, SCH_OP_NULL); - - SCH_JOB_DLOG("job end %s operation", schGetOpStr(op)); -} - -int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync) { - int32_t code = 0; - int8_t status = 0; - - if (schJobNeedToStop(pJob, &status)) { - SCH_JOB_ELOG("abort op %s cause of job need to stop", schGetOpStr(type)); - SCH_ERR_JRET(pJob->errCode); - } - - if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) { - SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op)); - SCH_ERR_JRET(TSDB_CODE_TSC_APP_ERROR); - } - - SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op)); - - pJob->opStatus.syncReq = sync; - - switch (type) { - case SCH_OP_EXEC: - SCH_ERR_JRET(schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC)); - break; - case SCH_OP_FETCH: - if (!SCH_JOB_NEED_FETCH(pJob)) { - SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob)); - SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); - } - - if (status != JOB_TASK_STATUS_PART_SUCC) { - SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status)); - SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); - } - break; - default: - SCH_JOB_ELOG("unknown operation type %d", type); - SCH_ERR_JRET(TSDB_CODE_TSC_APP_ERROR); - } - - return TSDB_CODE_SUCCESS; - -_return: - - schEndOperation(pJob); - - SCH_RET(code); -} - int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { for (int32_t i = 0; i < pJob->levelNum; ++i) { SSchLevel *pLevel = taosArrayGet(pJob->levels, i); @@ -827,6 +685,89 @@ int32_t schJobFetchRowsA(SSchJob *pJob) { return TSDB_CODE_SUCCESS; } + +int32_t schInitJob(SSchJob **pSchJob, SSchedulerReq *pReq) { + int32_t code = 0; + int64_t refId = -1; + SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob)); + if (NULL == pJob) { + qError("QID:0x%" PRIx64 " calloc %d failed", pReq->pDag->queryId, (int32_t)sizeof(SSchJob)); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + pJob->attr.explainMode = pReq->pDag->explainInfo.mode; + pJob->conn = *pReq->pConn; + pJob->sql = pReq->sql; + pJob->pDag = pReq->pDag; + pJob->chkKillFp = pReq->chkKillFp; + pJob->chkKillParam = pReq->chkKillParam; + pJob->userRes.execFp = pReq->execFp; + pJob->userRes.userParam = pReq->execParam; + pJob->opStatus.op = SCH_OP_EXEC; + pJob->opStatus.syncReq = pReq->syncReq; + + if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) { + qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId); + } else { + pJob->nodeList = taosArrayDup(pReq->pNodeList); + } + + pJob->taskList = + taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == pJob->taskList) { + SCH_JOB_ELOG("taosHashInit %d taskList failed", pReq->pDag->numOfSubplans); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SCH_ERR_JRET(schValidateAndBuildJob(pReq->pDag, pJob)); + + if (SCH_IS_EXPLAIN_JOB(pJob)) { + SCH_ERR_JRET(qExecExplainBegin(pReq->pDag, &pJob->explainCtx, pReq->startTs)); + } + + pJob->execTasks = + taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == pJob->execTasks) { + SCH_JOB_ELOG("taosHashInit %d execTasks failed", pReq->pDag->numOfSubplans); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + tsem_init(&pJob->rspSem, 0, 0); + + refId = taosAddRef(schMgmt.jobRef, pJob); + if (refId < 0) { + SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno)); + SCH_ERR_JRET(terrno); + } + + atomic_add_fetch_32(&schMgmt.jobNum, 1); + + if (NULL == schAcquireJob(refId)) { + SCH_JOB_ELOG("schAcquireJob job failed, refId:0x%" PRIx64, refId); + SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); + } + + pJob->refId = refId; + + SCH_JOB_DLOG("job refId:0x%" PRIx64" created", pJob->refId); + + *pSchJob = pJob; + + return TSDB_CODE_SUCCESS; + +_return: + + if (NULL == pJob) { + qDestroyQueryPlan(pReq->pDag); + } else if (refId < 0) { + schFreeJobImpl(pJob); + } else { + taosRemoveRef(schMgmt.jobRef, refId); + } + + SCH_RET(code); +} + int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) { int32_t code = 0; qDebug("QID:0x%" PRIx64 " sch job refId 0x%"PRIx64 " started", pReq->pDag->queryId, pJob->refId); @@ -847,6 +788,69 @@ _return: SCH_RET(schProcessOnJobFailure(pJob, code)); } + +void schProcessOnOpEnd(SSchJob *pJob) { + int32_t op = atomic_load_32(&pJob->opStatus.op); + if (SCH_OP_NULL == op) { + SCH_JOB_DLOG("job already not in any operation, status:%s", jobTaskStatusStr(pJob->status)); + return; + } + + atomic_store_32(&pJob->opStatus.op, SCH_OP_NULL); + + SCH_JOB_DLOG("job end %s operation", schGetOpStr(op)); +} + +int32_t schProcessOnOpBegin(SSchJob* pJob, SSchEvent* pEvent) { + int32_t code = 0; + int8_t status = 0; + SSchOpEvent* pInfo = (SSchOpEvent*)pEvent->info; + SCH_OP_TYPE type, bool sync; + + if (schJobNeedToStop(pJob, &status)) { + SCH_JOB_ELOG("abort op %s cause of job need to stop", schGetOpStr(type)); + SCH_ERR_JRET(pJob->errCode); + } + + if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) { + SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op)); + SCH_ERR_JRET(TSDB_CODE_TSC_APP_ERROR); + } + + SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op)); + + pJob->opStatus.syncReq = sync; + + switch (type) { + case SCH_OP_EXEC: + SCH_ERR_JRET(schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC)); + break; + case SCH_OP_FETCH: + if (!SCH_JOB_NEED_FETCH(pJob)) { + SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob)); + SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + if (status != JOB_TASK_STATUS_PART_SUCC) { + SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status)); + SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); + } + break; + default: + SCH_JOB_ELOG("unknown operation type %d", type); + SCH_ERR_JRET(TSDB_CODE_TSC_APP_ERROR); + } + + return TSDB_CODE_SUCCESS; + +_return: + + schEndOperation(pJob); + + SCH_RET(code); +} + + int32_t schJobStatusEnter(SSchJob** job, int32_t status, void* param) { SCH_ERR_RET(schUpdateJobStatus(*job, status)); @@ -866,9 +870,13 @@ int32_t schJobStatusEnter(SSchJob** job, int32_t status, void* param) { return TSDB_CODE_SUCCESS; } -int32_t schJobStatusEvent() { - - schEndOperation(pJob); +int32_t schJobHandleEvent(SSchJob* pJob, SSchEvent* pEvent) { + switch (pEvent->event) { + case SCH_EVENT_BEGIN_OP: + schProcessOnOpBegin(pJob, pEvent); + case SCH_EVENT_END_OP: + schProcessOnOpEnd(pJob); + } } diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index ccbd1f4615142b6c0ae96996dc533016a1b501b5..0e1d749533d08f8672990ae1dda3b595a41324c3 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -601,6 +601,11 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } + if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { + SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask)); /*