提交 c9940c0e 编写于 作者: D dapan

feature scheduler

上级 97d7d3ac
......@@ -141,8 +141,8 @@ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet*
// Set datasource of this subplan, multiple calls may be made to a subplan.
// @subplan subplan to be schedule
// @templateId templateId of a group of datasource subplans of this @subplan
// @eps Execution location of this group of datasource subplans, is an array of SEpAddr structures
int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SArray* eps);
// @ep one execution location of this group of datasource subplans
int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep);
int32_t qExplainQuery(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, char** str);
......
......@@ -639,8 +639,6 @@ int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet*
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
pQnodeEpSet->inUse = 0;
pQnodeEpSet->numOfEps = 0;
return TSDB_CODE_SUCCESS;
}
......
......@@ -29,7 +29,7 @@ extern "C" {
#define SCHEDULE_DEFAULT_JOB_NUMBER 1000
#define SCHEDULE_DEFAULT_TASK_NUMBER 1000
#define SCHEDULE_MAX_CONDIDATE_EP_NUM 3
#define SCH_MAX_CONDIDATE_EP_NUM TSDB_MAX_REPLICA
enum {
SCH_STATUS_NOT_START = 1,
......@@ -54,7 +54,6 @@ typedef struct SQueryTask {
SEpAddr execAddr; // task actual executed node address
SQueryProfileSummary summary; // task execution summary
int32_t childReady; // child task ready number
SArray *childSrcEp; // child Eps, element is SEpAddr
SArray *childern; // the datasource tasks,from which to fetch the result, element is SQueryTask*
SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask*
} SQueryTask;
......@@ -73,9 +72,13 @@ typedef struct SQueryJob {
int8_t status;
SQueryProfileSummary summary;
SEpSet dataSrcEps;
SEpAddr resEp;
struct SCatalog *catalog;
void *rpc;
SEpSet *mgmtEpSet;
tsem_t rspSem;
int32_t userFetch;
void *res;
SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask*
SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask*
......
......@@ -224,14 +224,21 @@ _return:
SCH_RET(code);
}
int32_t schAvailableEpSet(SQueryJob *job, SEpSet *epSet) {
SCH_ERR_RET(catalogGetQnodeList(job->catalog, job->rpc, job->mgmtEpSet, epSet));
if (epSet->numOfEps > SCHEDULE_MAX_CONDIDATE_EP_NUM) {
int32_t schAvailableEpSet(SQueryJob *job, SEpSet *epSet) {
if (epSet->numOfEps >= SCH_MAX_CONDIDATE_EP_NUM) {
return TSDB_CODE_SUCCESS;
}
//TODO COPY dataSrcEps TO epSet
if (SCH_HAS_QNODE_IN_CLUSTER(schMgmt.cfg.clusterType)) {
SCH_ERR_RET(catalogGetQnodeList(job->catalog, job->rpc, job->mgmtEpSet, epSet));
} else {
for (int32_t i = 0; i < job->dataSrcEps.numOfEps; ++i) {
strncpy(epSet->fqdn[epSet->numOfEps], &job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn));
epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
++epSet->numOfEps;
}
}
return TSDB_CODE_SUCCESS;
}
......@@ -245,6 +252,10 @@ int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCod
}
int32_t schFetchFromRemote(SQueryJob *job) {
}
int32_t schProcessOnJobSuccess(SQueryJob *job) {
......@@ -271,12 +282,15 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
strncpy(job->resEp.fqdn, task->execAddr.fqdn, sizeof(job->resEp.fqdn));
job->resEp.port = task->execAddr.port;
SCH_ERR_RET(schProcessOnJobSuccess());
return TSDB_CODE_SUCCESS;
}
if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCHEDULE_MAX_CONDIDATE_EP_NUM) {
if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CONDIDATE_EP_NUM) {
strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;
......@@ -288,10 +302,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
++par->childReady;
if (NULL == taosArrayPush(par->childSrcEp, &task->execAddr)) {
qError("taosArrayPush failed");
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, task->plan->id.templateId, &task->execAddr));
if (SCH_TASK_READY_TO_LUNCH(par)) {
SCH_ERR_RET(schTaskRun(job, task));
......@@ -414,6 +425,8 @@ int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pM
qError("taosHashInit %d failed", pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
tsem_init(&job->rspSem, 0, 0);
SCH_ERR_JRET(schJobRun(job));
......@@ -429,7 +442,26 @@ _return:
SCH_RET(code);
}
int32_t scheduleFetchRows(void *pRpc, void *pJob, void *data);
int32_t scheduleFetchRows(void *pRpc, void *pJob, void **data) {
if (NULL == pRpc || NULL == pJob || NULL == data) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
SQueryJob *job = pJob;
if (atomic_val_compare_exchange_32(&job->userFetch, 0, 1) != 0) {
qError("prior fetching not finished");
return TSDB_CODE_QRY_APP_ERROR;
}
SCH_ERR_RET(schFetchFromRemote(job));
tsem_wait(&job->rspSem);
*data = job->res;
return TSDB_CODE_SUCCESS;
}
int32_t scheduleCancelJob(void *pRpc, void *pJob);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册