From a702b1460a93b76600a34460580a6de3b5c36e71 Mon Sep 17 00:00:00 2001 From: dapan Date: Sat, 18 Dec 2021 17:35:27 +0800 Subject: [PATCH] fix compile error --- include/libs/scheduler/scheduler.h | 3 +- source/libs/planner/inc/plannerInt.h | 2 +- source/libs/planner/src/physicalPlan.c | 2 +- source/libs/planner/src/planner.c | 4 +- source/libs/scheduler/CMakeLists.txt | 4 +- source/libs/scheduler/inc/schedulerInt.h | 5 +- source/libs/scheduler/src/scheduler.c | 66 ++++++++++++------------ 7 files changed, 46 insertions(+), 40 deletions(-) diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index c5002e1b36..c5f3cd8f0f 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -21,6 +21,7 @@ extern "C" { #endif #include "planner.h" +#include "catalog.h" typedef struct SSchedulerCfg { int32_t clusterType; @@ -57,7 +58,7 @@ typedef struct SQueryProfileSummary { */ int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob); -int32_t scheduleFetchRows(void *pRpc, void *pJob, void *data); +int32_t scheduleFetchRows(void *pRpc, void *pJob, void **data); /** diff --git a/source/libs/planner/inc/plannerInt.h b/source/libs/planner/inc/plannerInt.h index 19563a8a0c..c9b7b6d235 100644 --- a/source/libs/planner/inc/plannerInt.h +++ b/source/libs/planner/inc/plannerInt.h @@ -100,7 +100,7 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str); int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql); int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag); -int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SArray* eps); +int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep); int32_t subPlanToString(const SSubplan *pPhyNode, char** str); int32_t stringToSubplan(const char* str, SSubplan** subplan); diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index f187ec0ec9..72b30012c8 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -225,6 +225,6 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD return TSDB_CODE_SUCCESS; } -int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SArray* eps) { +int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep) { //todo } diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 3a90acb5fd..e9a4591d4a 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -46,8 +46,8 @@ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* return TSDB_CODE_SUCCESS; } -int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SArray* eps) { - return setSubplanExecutionNode(subplan, templateId, eps); +int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep) { + return setSubplanExecutionNode(subplan, templateId, ep); } int32_t qSubPlanToString(const SSubplan *subplan, char** str) { diff --git a/source/libs/scheduler/CMakeLists.txt b/source/libs/scheduler/CMakeLists.txt index 31f1c25bea..cdb4e08205 100644 --- a/source/libs/scheduler/CMakeLists.txt +++ b/source/libs/scheduler/CMakeLists.txt @@ -9,5 +9,5 @@ target_include_directories( target_link_libraries( scheduler - PRIVATE os util planner qcom common -) \ No newline at end of file + PRIVATE os util planner qcom common catalog transport +) diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 368f01e5ac..4f363aa032 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -54,7 +54,7 @@ typedef struct SQueryTask { SEpAddr execAddr; // task actual executed node address SQueryProfileSummary summary; // task execution summary int32_t childReady; // child task ready number - SArray *childern; // the datasource tasks,from which to fetch the result, element is SQueryTask* + SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* } SQueryTask; @@ -87,6 +87,7 @@ typedef struct SQueryJob { SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. } SQueryJob; +#define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE #define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE #define SCH_IS_DATA_SRC_TASK(task) (task->plan->type == QUERY_TYPE_SCAN) @@ -99,6 +100,8 @@ typedef struct SQueryJob { #define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) +extern int32_t schTaskRun(SQueryJob *job, SQueryTask *task); + #ifdef __cplusplus } #endif diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index a7676b6c76..3862ad1ade 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -62,8 +62,8 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { int32_t parentNum = (int32_t)taosArrayGetSize(plan->pParents); if (childNum > 0) { - task->childern = taosArrayInit(childNum, POINTER_BYTES); - if (NULL == task->childern) { + task->children = taosArrayInit(childNum, POINTER_BYTES); + if (NULL == task->children) { qError("taosArrayInit %d failed", childNum); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -77,7 +77,7 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - if (NULL == taosArrayPush(task->childern, &childTask)) { + if (NULL == taosArrayPush(task->children, &childTask)) { qError("taosArrayPush failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -233,7 +233,7 @@ int32_t schAvailableEpSet(SQueryJob *job, SEpSet *epSet) { SCH_ERR_RET(catalogGetQnodeList(job->catalog, job->rpc, job->mgmtEpSet, epSet)); } else { for (int32_t i = 0; i < job->dataSrcEps.numOfEps; ++i) { - strncpy(epSet->fqdn[epSet->numOfEps], &job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn)); + strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i])); epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i]; ++epSet->numOfEps; @@ -244,6 +244,32 @@ int32_t schAvailableEpSet(SQueryJob *job, SEpSet *epSet) { } +int32_t schPushTaskToExecList(SQueryJob *job, SQueryTask *task) { + if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { + qError("taosHashPut failed"); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) { + if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { + qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId); + return TSDB_CODE_SUCCESS; + } + + if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { + qError("taosHashPut failed"); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + *moved = true; + + return TSDB_CODE_SUCCESS; +} + + int32_t schAsyncLaunchTask(SQueryJob *job, SQueryTask *task) { } @@ -271,7 +297,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved)); if (!moved) { - qWarn("task[%d] already moved", task->taskId); + SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status); return TSDB_CODE_SUCCESS; } @@ -285,7 +311,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { strncpy(job->resEp.fqdn, task->execAddr.fqdn, sizeof(job->resEp.fqdn)); job->resEp.port = task->execAddr.port; - SCH_ERR_RET(schProcessOnJobSuccess()); + SCH_ERR_RET(schProcessOnJobSuccess(job)); return TSDB_CODE_SUCCESS; } @@ -331,30 +357,6 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod } -int32_t schPushTaskToExecList(SQueryJob *job, SQueryTask *task) { - if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { - qError("taosHashPut failed"); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - return TSDB_CODE_SUCCESS; -} - -int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) { - if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { - qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId); - return TSDB_CODE_SUCCESS - } - - if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { - qError("taosHashPut failed"); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - *moved = true; - - return TSDB_CODE_SUCCESS; -} int32_t schTaskRun(SQueryJob *job, SQueryTask *task) { @@ -367,7 +369,7 @@ int32_t schTaskRun(SQueryJob *job, SQueryTask *task) { SCH_ERR_RET(schAsyncLaunchTask(job, task)); - SCH_ERR_RET(schPushTaskToExecList(job, task)) + SCH_ERR_RET(schPushTaskToExecList(job, task)); return TSDB_CODE_SUCCESS; } @@ -410,7 +412,7 @@ int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pM job->catalog = pCatalog; job->rpc = pRpc; - job->mgmtEpSet = pMgmtEps; + job->mgmtEpSet = (SEpSet *)pMgmtEps; SCH_ERR_JRET(schValidateAndBuildJob(pDag, job)); -- GitLab