提交 76738ad2 编写于 作者: D dapan

feature scheduler

上级 8dd9cc74
......@@ -137,7 +137,7 @@ int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void *pRpc, const S
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp);
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet);
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet);
......
......@@ -23,7 +23,7 @@ extern "C" {
#include "planner.h"
typedef struct SSchedulerCfg {
int32_t clusterType;
} SSchedulerCfg;
typedef struct SQueryProfileSummary {
......@@ -55,7 +55,7 @@ typedef struct SQueryProfileSummary {
* @param pJob
* @return
*/
int32_t scheduleQueryJob(void *pRpc, SQueryDag* pDag, void** pJob);
int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob);
int32_t scheduleFetchRows(void *pRpc, void *pJob, void *data);
......
......@@ -634,6 +634,18 @@ _return:
CTG_RET(code);
}
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pQnodeEpSet) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
pQnodeEpSet->inUse = 0;
pQnodeEpSet->numOfEps = 0;
return TSDB_CODE_SUCCESS;
}
void catalogDestroy(void) {
if (ctgMgmt.pCluster) {
taosHashCleanup(ctgMgmt.pCluster); //TBD
......
......@@ -29,6 +29,8 @@ extern "C" {
#define SCHEDULE_DEFAULT_JOB_NUMBER 1000
#define SCHEDULE_DEFAULT_TASK_NUMBER 1000
#define SCHEDULE_MAX_CONDIDATE_EP_NUM 3
enum {
SCH_STATUS_NOT_START = 1,
SCH_STATUS_EXECUTING,
......@@ -40,6 +42,7 @@ enum {
typedef struct SSchedulerMgmt {
uint64_t taskId;
SSchedulerCfg cfg;
SHashObj *Jobs; // key: queryId, value: SQueryJob*
} SSchedulerMgmt;
......@@ -69,6 +72,10 @@ typedef struct SQueryJob {
int32_t levelIdx;
int8_t status;
SQueryProfileSummary summary;
SEpSet dataSrcEps;
struct SCatalog *catalog;
void *rpc;
SEpSet *mgmtEpSet;
SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask*
SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask*
......@@ -78,7 +85,7 @@ typedef struct SQueryJob {
} SQueryJob;
#define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE
#define SCH_IS_DATA_SRC_TASK(task) (task->plan->type == QUERY_TYPE_SCAN)
#define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__)
#define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__)
......
......@@ -16,6 +16,7 @@
#include "schedulerInt.h"
#include "taosmsg.h"
#include "query.h"
#include "catalog.h"
SSchedulerMgmt schMgmt = {0};
......@@ -223,8 +224,16 @@ _return:
SCH_RET(code);
}
int32_t schAvailableEpSet(SEpSet *epSet) {
int32_t schAvailableEpSet(SQueryJob *job, SEpSet *epSet) {
SCH_ERR_RET(catalogGetQnodeList(job->catalog, job->rpc, job->mgmtEpSet, epSet));
if (epSet->numOfEps > SCHEDULE_MAX_CONDIDATE_EP_NUM) {
return TSDB_CODE_SUCCESS;
}
//TODO COPY dataSrcEps TO epSet
return TSDB_CODE_SUCCESS;
}
......@@ -267,6 +276,13 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
return TSDB_CODE_SUCCESS;
}
if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCHEDULE_MAX_CONDIDATE_EP_NUM) {
strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;
++job->dataSrcEps.numOfEps;
}
for (int32_t i = 0; i < parentNum; ++i) {
SQueryTask *par = taosArrayGet(task->parents, i);
......@@ -335,7 +351,7 @@ int32_t schTaskRun(SQueryJob *job, SQueryTask *task) {
SCH_ERR_RET(qSubPlanToString(plan, &task->msg));
if (plan->execEpSet.numOfEps <= 0) {
SCH_ERR_RET(schAvailableEpSet(&plan->execEpSet));
SCH_ERR_RET(schAvailableEpSet(job, &plan->execEpSet));
}
SCH_ERR_RET(schAsyncLaunchTask(job, task));
......@@ -362,12 +378,16 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", SCHEDULE_DEFAULT_JOB_NUMBER);
}
if (cfg) {
schMgmt.cfg = *cfg;
}
return TSDB_CODE_SUCCESS;
}
int32_t scheduleQueryJob(void *pRpc, SQueryDag* pDag, void** pJob) {
if (NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
......@@ -377,6 +397,10 @@ int32_t scheduleQueryJob(void *pRpc, SQueryDag* pDag, void** pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
job->catalog = pCatalog;
job->rpc = pRpc;
job->mgmtEpSet = pMgmtEps;
SCH_ERR_JRET(schValidateAndBuildJob(pDag, job));
job->execTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册