提交 9e00672c 编写于 作者: D dapan1121

enh: refactor scheduler code

上级 211ae46a
......@@ -552,6 +552,9 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLES) ||
0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED)) {
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
} else {
pSubplan->execNode.nodeId = MNODE_HANDLE;
pSubplan->execNode.epSet = pCxt->pPlanCxt->mgmtEpSet;
}
SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
taosArrayPush(pCxt->pExecNodeList, &node);
......
......@@ -55,8 +55,8 @@ typedef enum {
} SCH_OP_TYPE;
typedef enum {
SCH_EVENT_ENTER_API = 1,
SCH_EVENT_LEAVE_API,
SCH_EVENT_BEGIN_OP = 1,
SCH_EVENT_END_OP,
SCH_EVENT_MSG,
SCH_EVENT_DROP,
} SCH_EVENT_TYPE;
......@@ -111,6 +111,12 @@ typedef struct SSchResInfo {
void* userParam;
} SSchResInfo;
typedef struct SSchOpEvent {
SCH_OP_TYPE type;
bool begin;
SSchedulerReq *pReq;
} SSchOpEvent;
typedef struct SSchEvent {
SCH_EVENT_TYPE event;
void* info;
......
......@@ -25,88 +25,6 @@ FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { qDebug("sch acquire jobId:0
FORCE_INLINE int32_t schReleaseJob(int64_t refId) { qDebug("sch release jobId:0x%"PRIx64, refId); return taosReleaseRef(schMgmt.jobRef, refId); }
int32_t schInitJob(SSchJob **pSchJob, SSchedulerReq *pReq) {
int32_t code = 0;
int64_t refId = -1;
SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
if (NULL == pJob) {
qError("QID:0x%" PRIx64 " calloc %d failed", pReq->pDag->queryId, (int32_t)sizeof(SSchJob));
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pJob->attr.explainMode = pReq->pDag->explainInfo.mode;
pJob->conn = *pReq->pConn;
pJob->sql = pReq->sql;
pJob->pDag = pReq->pDag;
pJob->chkKillFp = pReq->chkKillFp;
pJob->chkKillParam = pReq->chkKillParam;
pJob->userRes.execFp = pReq->execFp;
pJob->userRes.userParam = pReq->execParam;
pJob->opStatus.op = SCH_OP_EXEC;
pJob->opStatus.syncReq = pReq->syncReq;
if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) {
qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId);
} else {
pJob->nodeList = taosArrayDup(pReq->pNodeList);
}
pJob->taskList =
taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pJob->taskList) {
SCH_JOB_ELOG("taosHashInit %d taskList failed", pReq->pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_ERR_JRET(schValidateAndBuildJob(pReq->pDag, pJob));
if (SCH_IS_EXPLAIN_JOB(pJob)) {
SCH_ERR_JRET(qExecExplainBegin(pReq->pDag, &pJob->explainCtx, pReq->startTs));
}
pJob->execTasks =
taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pJob->execTasks) {
SCH_JOB_ELOG("taosHashInit %d execTasks failed", pReq->pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
tsem_init(&pJob->rspSem, 0, 0);
refId = taosAddRef(schMgmt.jobRef, pJob);
if (refId < 0) {
SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
SCH_ERR_JRET(terrno);
}
atomic_add_fetch_32(&schMgmt.jobNum, 1);
if (NULL == schAcquireJob(refId)) {
SCH_JOB_ELOG("schAcquireJob job failed, refId:0x%" PRIx64, refId);
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
pJob->refId = refId;
SCH_JOB_DLOG("job refId:0x%" PRIx64" created", pJob->refId);
*pSchJob = pJob;
return TSDB_CODE_SUCCESS;
_return:
if (NULL == pJob) {
qDestroyQueryPlan(pReq->pDag);
} else if (refId < 0) {
schFreeJobImpl(pJob);
} else {
taosRemoveRef(schMgmt.jobRef, refId);
}
SCH_RET(code);
}
void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode) {
if (TSDB_CODE_SUCCESS == errCode) {
......@@ -231,66 +149,6 @@ _return:
SCH_RET(code);
}
void schEndOperation(SSchJob *pJob) {
int32_t op = atomic_load_32(&pJob->opStatus.op);
if (SCH_OP_NULL == op) {
SCH_JOB_DLOG("job already not in any operation, status:%s", jobTaskStatusStr(pJob->status));
return;
}
atomic_store_32(&pJob->opStatus.op, SCH_OP_NULL);
SCH_JOB_DLOG("job end %s operation", schGetOpStr(op));
}
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync) {
int32_t code = 0;
int8_t status = 0;
if (schJobNeedToStop(pJob, &status)) {
SCH_JOB_ELOG("abort op %s cause of job need to stop", schGetOpStr(type));
SCH_ERR_JRET(pJob->errCode);
}
if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
SCH_ERR_JRET(TSDB_CODE_TSC_APP_ERROR);
}
SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
pJob->opStatus.syncReq = sync;
switch (type) {
case SCH_OP_EXEC:
SCH_ERR_JRET(schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC));
break;
case SCH_OP_FETCH:
if (!SCH_JOB_NEED_FETCH(pJob)) {
SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob));
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
if (status != JOB_TASK_STATUS_PART_SUCC) {
SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
break;
default:
SCH_JOB_ELOG("unknown operation type %d", type);
SCH_ERR_JRET(TSDB_CODE_TSC_APP_ERROR);
}
return TSDB_CODE_SUCCESS;
_return:
schEndOperation(pJob);
SCH_RET(code);
}
int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) {
for (int32_t i = 0; i < pJob->levelNum; ++i) {
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
......@@ -827,6 +685,89 @@ int32_t schJobFetchRowsA(SSchJob *pJob) {
return TSDB_CODE_SUCCESS;
}
int32_t schInitJob(SSchJob **pSchJob, SSchedulerReq *pReq) {
int32_t code = 0;
int64_t refId = -1;
SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
if (NULL == pJob) {
qError("QID:0x%" PRIx64 " calloc %d failed", pReq->pDag->queryId, (int32_t)sizeof(SSchJob));
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pJob->attr.explainMode = pReq->pDag->explainInfo.mode;
pJob->conn = *pReq->pConn;
pJob->sql = pReq->sql;
pJob->pDag = pReq->pDag;
pJob->chkKillFp = pReq->chkKillFp;
pJob->chkKillParam = pReq->chkKillParam;
pJob->userRes.execFp = pReq->execFp;
pJob->userRes.userParam = pReq->execParam;
pJob->opStatus.op = SCH_OP_EXEC;
pJob->opStatus.syncReq = pReq->syncReq;
if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) {
qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId);
} else {
pJob->nodeList = taosArrayDup(pReq->pNodeList);
}
pJob->taskList =
taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pJob->taskList) {
SCH_JOB_ELOG("taosHashInit %d taskList failed", pReq->pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_ERR_JRET(schValidateAndBuildJob(pReq->pDag, pJob));
if (SCH_IS_EXPLAIN_JOB(pJob)) {
SCH_ERR_JRET(qExecExplainBegin(pReq->pDag, &pJob->explainCtx, pReq->startTs));
}
pJob->execTasks =
taosHashInit(pReq->pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pJob->execTasks) {
SCH_JOB_ELOG("taosHashInit %d execTasks failed", pReq->pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
tsem_init(&pJob->rspSem, 0, 0);
refId = taosAddRef(schMgmt.jobRef, pJob);
if (refId < 0) {
SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
SCH_ERR_JRET(terrno);
}
atomic_add_fetch_32(&schMgmt.jobNum, 1);
if (NULL == schAcquireJob(refId)) {
SCH_JOB_ELOG("schAcquireJob job failed, refId:0x%" PRIx64, refId);
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
pJob->refId = refId;
SCH_JOB_DLOG("job refId:0x%" PRIx64" created", pJob->refId);
*pSchJob = pJob;
return TSDB_CODE_SUCCESS;
_return:
if (NULL == pJob) {
qDestroyQueryPlan(pReq->pDag);
} else if (refId < 0) {
schFreeJobImpl(pJob);
} else {
taosRemoveRef(schMgmt.jobRef, refId);
}
SCH_RET(code);
}
int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
int32_t code = 0;
qDebug("QID:0x%" PRIx64 " sch job refId 0x%"PRIx64 " started", pReq->pDag->queryId, pJob->refId);
......@@ -847,6 +788,69 @@ _return:
SCH_RET(schProcessOnJobFailure(pJob, code));
}
void schProcessOnOpEnd(SSchJob *pJob) {
int32_t op = atomic_load_32(&pJob->opStatus.op);
if (SCH_OP_NULL == op) {
SCH_JOB_DLOG("job already not in any operation, status:%s", jobTaskStatusStr(pJob->status));
return;
}
atomic_store_32(&pJob->opStatus.op, SCH_OP_NULL);
SCH_JOB_DLOG("job end %s operation", schGetOpStr(op));
}
int32_t schProcessOnOpBegin(SSchJob* pJob, SSchEvent* pEvent) {
int32_t code = 0;
int8_t status = 0;
SSchOpEvent* pInfo = (SSchOpEvent*)pEvent->info;
SCH_OP_TYPE type, bool sync;
if (schJobNeedToStop(pJob, &status)) {
SCH_JOB_ELOG("abort op %s cause of job need to stop", schGetOpStr(type));
SCH_ERR_JRET(pJob->errCode);
}
if (SCH_OP_NULL != atomic_val_compare_exchange_32(&pJob->opStatus.op, SCH_OP_NULL, type)) {
SCH_JOB_ELOG("job already in %s operation", schGetOpStr(pJob->opStatus.op));
SCH_ERR_JRET(TSDB_CODE_TSC_APP_ERROR);
}
SCH_JOB_DLOG("job start %s operation", schGetOpStr(pJob->opStatus.op));
pJob->opStatus.syncReq = sync;
switch (type) {
case SCH_OP_EXEC:
SCH_ERR_JRET(schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC));
break;
case SCH_OP_FETCH:
if (!SCH_JOB_NEED_FETCH(pJob)) {
SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob));
SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
if (status != JOB_TASK_STATUS_PART_SUCC) {
SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
break;
default:
SCH_JOB_ELOG("unknown operation type %d", type);
SCH_ERR_JRET(TSDB_CODE_TSC_APP_ERROR);
}
return TSDB_CODE_SUCCESS;
_return:
schEndOperation(pJob);
SCH_RET(code);
}
int32_t schJobStatusEnter(SSchJob** job, int32_t status, void* param) {
SCH_ERR_RET(schUpdateJobStatus(*job, status));
......@@ -866,9 +870,13 @@ int32_t schJobStatusEnter(SSchJob** job, int32_t status, void* param) {
return TSDB_CODE_SUCCESS;
}
int32_t schJobStatusEvent() {
schEndOperation(pJob);
int32_t schJobHandleEvent(SSchJob* pJob, SSchEvent* pEvent) {
switch (pEvent->event) {
case SCH_EVENT_BEGIN_OP:
schProcessOnOpBegin(pJob, pEvent);
case SCH_EVENT_END_OP:
schProcessOnOpEnd(pJob);
}
}
......
......@@ -601,6 +601,11 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS;
}
if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) {
SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps);
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
SCH_ERR_RET(schSetAddrsFromNodeList(pJob, pTask));
/*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册