提交 97da2ee9 编写于 作者: D dapan1121

modify api and compile warning

上级 35f76a3c
...@@ -58,7 +58,7 @@ typedef struct SQueryProfileSummary { ...@@ -58,7 +58,7 @@ typedef struct SQueryProfileSummary {
*/ */
int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob); int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob);
int32_t scheduleFetchRows(void *pRpc, void *pJob, void **data); int32_t scheduleFetchRows(void *pJob, void **data);
/** /**
...@@ -66,7 +66,7 @@ int32_t scheduleFetchRows(void *pRpc, void *pJob, void **data); ...@@ -66,7 +66,7 @@ int32_t scheduleFetchRows(void *pRpc, void *pJob, void **data);
* @param pJob * @param pJob
* @return * @return
*/ */
int32_t scheduleCancelJob(void *pRpc, void *pJob); int32_t scheduleCancelJob(void *pJob);
void scheduleFreeJob(void *pJob); void scheduleFreeJob(void *pJob);
......
...@@ -127,6 +127,8 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { ...@@ -127,6 +127,8 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) {
int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
int32_t code = 0; int32_t code = 0;
job->queryId = dag->queryId;
if (dag->numOfSubplans <= 0) { if (dag->numOfSubplans <= 0) {
qError("invalid subplan num:%d", dag->numOfSubplans); qError("invalid subplan num:%d", dag->numOfSubplans);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
...@@ -152,7 +154,6 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { ...@@ -152,7 +154,6 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
job->levelNum = levelNum; job->levelNum = levelNum;
job->levelIdx = levelNum - 1; job->levelIdx = levelNum - 1;
job->status = SCH_STATUS_NOT_START;
job->subPlans = dag->pSubplans; job->subPlans = dag->pSubplans;
...@@ -341,50 +342,6 @@ int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCod ...@@ -341,50 +342,6 @@ int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCod
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32_t rspCode) {
int32_t code = 0;
switch (msgType) {
case TSDB_MSG_TYPE_QUERY:
if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
} else {
code = schAsyncSendMsg(job, task, TSDB_MSG_TYPE_RSP_READY);
if (code) {
goto _task_error;
}
}
break;
case TSDB_MSG_TYPE_RSP_READY:
if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
} else {
code = schProcessOnTaskSuccess(job, task);
if (code) {
goto _task_error;
}
}
break;
case TSDB_MSG_TYPE_FETCH:
SCH_ERR_JRET(rspCode);
SCH_ERR_JRET(schProcessOnDataFetched(job));
break;
default:
qError("unknown msg type:%d received", msgType);
return TSDB_CODE_QRY_INVALID_INPUT;
}
return TSDB_CODE_SUCCESS;
_task_error:
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, code));
return TSDB_CODE_SUCCESS;
_return:
code = schProcessOnJobFailure(job);
return code;
}
int32_t schFetchFromRemote(SQueryJob *job) { int32_t schFetchFromRemote(SQueryJob *job) {
int32_t code = 0; int32_t code = 0;
...@@ -500,6 +457,50 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod ...@@ -500,6 +457,50 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32_t rspCode) {
int32_t code = 0;
switch (msgType) {
case TSDB_MSG_TYPE_QUERY:
if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
} else {
code = schAsyncSendMsg(job, task, TSDB_MSG_TYPE_RSP_READY);
if (code) {
goto _task_error;
}
}
break;
case TSDB_MSG_TYPE_RSP_READY:
if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
} else {
code = schProcessOnTaskSuccess(job, task);
if (code) {
goto _task_error;
}
}
break;
case TSDB_MSG_TYPE_FETCH:
SCH_ERR_JRET(rspCode);
SCH_ERR_JRET(schProcessOnDataFetched(job));
break;
default:
qError("unknown msg type:%d received", msgType);
return TSDB_CODE_QRY_INVALID_INPUT;
}
return TSDB_CODE_SUCCESS;
_task_error:
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, code));
return TSDB_CODE_SUCCESS;
_return:
code = schProcessOnJobFailure(job);
return code;
}
...@@ -578,6 +579,13 @@ int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pM ...@@ -578,6 +579,13 @@ int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pM
tsem_init(&job->rspSem, 0, 0); tsem_init(&job->rspSem, 0, 0);
if (0 != taosHashPut(schMgmt.Jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES)) {
qError("taosHashPut queryId:%"PRIx64" failed", job->queryId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
job->status = SCH_STATUS_NOT_START;
SCH_ERR_JRET(schLaunchJob(job)); SCH_ERR_JRET(schLaunchJob(job));
*(SQueryJob **)pJob = job; *(SQueryJob **)pJob = job;
...@@ -592,8 +600,8 @@ _return: ...@@ -592,8 +600,8 @@ _return:
SCH_RET(code); SCH_RET(code);
} }
int32_t scheduleFetchRows(void *pRpc, void *pJob, void **data) { int32_t scheduleFetchRows(void *pJob, void **data) {
if (NULL == pRpc || NULL == pJob || NULL == data) { if (NULL == pJob || NULL == data) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
...@@ -620,14 +628,31 @@ _return: ...@@ -620,14 +628,31 @@ _return:
return code; return code;
} }
int32_t scheduleCancelJob(void *pRpc, void *pJob); int32_t scheduleCancelJob(void *pJob) {
//TODO
void scheduleFreeJob(void *job) { return TSDB_CODE_SUCCESS;
if (NULL == job) { }
void scheduleFreeJob(void *pJob) {
if (NULL == pJob) {
return; return;
} }
//TODO SQueryJob *job = pJob;
if (job->status > 0) {
if (0 != taosHashRemove(schMgmt.Jobs, &job->queryId, sizeof(job->queryId))) {
qError("remove job:%"PRIx64"from mgmt failed", job->queryId); // maybe already freed
return;
}
if (job->status == SCH_STATUS_EXECUTING) {
scheduleCancelJob(pJob);
}
}
//TODO free job
} }
void schedulerDestroy(void) { void schedulerDestroy(void) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册