diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index ee626865fb85e3da5d799cb67a62ac3bc94d1ee0..68eae03f518441aad3aeb17837045cb66b34a92d 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -137,7 +137,7 @@ int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void *pRpc, const S int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp); -int32_t catalogGetQnodeList(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet); +int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet); diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 10eba7f49cbfda363e442b555c083272278723c1..c5002e1b366e17bbbea6deebe87ff726e98e91bc 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -23,7 +23,7 @@ extern "C" { #include "planner.h" typedef struct SSchedulerCfg { - + int32_t clusterType; } SSchedulerCfg; typedef struct SQueryProfileSummary { @@ -55,7 +55,7 @@ typedef struct SQueryProfileSummary { * @param pJob * @return */ -int32_t scheduleQueryJob(void *pRpc, SQueryDag* pDag, void** pJob); +int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob); int32_t scheduleFetchRows(void *pRpc, void *pJob, void *data); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 32cedb82b0ca834ead0822c0340919b508fe2d88..47731dde774c63e73734ff4cd842e2005f492308 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -634,6 +634,18 @@ _return: CTG_RET(code); } +int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet) { + if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pQnodeEpSet) { + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); + } + + pQnodeEpSet->inUse = 0; + pQnodeEpSet->numOfEps = 0; + + return TSDB_CODE_SUCCESS; +} + + void catalogDestroy(void) { if (ctgMgmt.pCluster) { taosHashCleanup(ctgMgmt.pCluster); //TBD diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index a08ccda54b816c88a26bd94bc2757ef1681eebbf..c5ab74a85f132b07248f99a05e0e5196e002cfc0 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -29,6 +29,8 @@ extern "C" { #define SCHEDULE_DEFAULT_JOB_NUMBER 1000 #define SCHEDULE_DEFAULT_TASK_NUMBER 1000 +#define SCHEDULE_MAX_CONDIDATE_EP_NUM 3 + enum { SCH_STATUS_NOT_START = 1, SCH_STATUS_EXECUTING, @@ -40,6 +42,7 @@ enum { typedef struct SSchedulerMgmt { uint64_t taskId; + SSchedulerCfg cfg; SHashObj *Jobs; // key: queryId, value: SQueryJob* } SSchedulerMgmt; @@ -69,6 +72,10 @@ typedef struct SQueryJob { int32_t levelIdx; int8_t status; SQueryProfileSummary summary; + SEpSet dataSrcEps; + struct SCatalog *catalog; + void *rpc; + SEpSet *mgmtEpSet; SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* @@ -78,7 +85,7 @@ typedef struct SQueryJob { } SQueryJob; #define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE - +#define SCH_IS_DATA_SRC_TASK(task) (task->plan->type == QUERY_TYPE_SCAN) #define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__) #define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 37d5b617d558a778e1739b6b97bd97baf6849f8f..396930dc5548d401d6d2aae98638015ef35847bc 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -16,6 +16,7 @@ #include "schedulerInt.h" #include "taosmsg.h" #include "query.h" +#include "catalog.h" SSchedulerMgmt schMgmt = {0}; @@ -223,8 +224,16 @@ _return: SCH_RET(code); } -int32_t schAvailableEpSet(SEpSet *epSet) { +int32_t schAvailableEpSet(SQueryJob *job, SEpSet *epSet) { + SCH_ERR_RET(catalogGetQnodeList(job->catalog, job->rpc, job->mgmtEpSet, epSet)); + if (epSet->numOfEps > SCHEDULE_MAX_CONDIDATE_EP_NUM) { + return TSDB_CODE_SUCCESS; + } + + //TODO COPY dataSrcEps TO epSet + + return TSDB_CODE_SUCCESS; } @@ -267,6 +276,13 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { return TSDB_CODE_SUCCESS; } + if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCHEDULE_MAX_CONDIDATE_EP_NUM) { + strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn)); + job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port; + + ++job->dataSrcEps.numOfEps; + } + for (int32_t i = 0; i < parentNum; ++i) { SQueryTask *par = taosArrayGet(task->parents, i); @@ -335,7 +351,7 @@ int32_t schTaskRun(SQueryJob *job, SQueryTask *task) { SCH_ERR_RET(qSubPlanToString(plan, &task->msg)); if (plan->execEpSet.numOfEps <= 0) { - SCH_ERR_RET(schAvailableEpSet(&plan->execEpSet)); + SCH_ERR_RET(schAvailableEpSet(job, &plan->execEpSet)); } SCH_ERR_RET(schAsyncLaunchTask(job, task)); @@ -362,12 +378,16 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", SCHEDULE_DEFAULT_JOB_NUMBER); } + if (cfg) { + schMgmt.cfg = *cfg; + } + return TSDB_CODE_SUCCESS; } -int32_t scheduleQueryJob(void *pRpc, SQueryDag* pDag, void** pJob) { - if (NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { +int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob) { + if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -377,6 +397,10 @@ int32_t scheduleQueryJob(void *pRpc, SQueryDag* pDag, void** pJob) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + job->catalog = pCatalog; + job->rpc = pRpc; + job->mgmtEpSet = pMgmtEps; + SCH_ERR_JRET(schValidateAndBuildJob(pDag, job)); job->execTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);