提交 d61e2bd1 编写于 作者: D dapan1121

define some scheduler apis

上级 eed7b6f0
...@@ -233,9 +233,9 @@ typedef struct { ...@@ -233,9 +233,9 @@ typedef struct {
} SEpAddrMsg; } SEpAddrMsg;
typedef struct { typedef struct {
char *fqdn; char fqdn[TSDB_FQDN_LEN];
uint16_t port; uint16_t port;
} SEpAddr1; } SEpAddr;
typedef struct { typedef struct {
int32_t numOfVnodes; int32_t numOfVnodes;
......
...@@ -55,9 +55,9 @@ typedef struct SQueryProfileSummary { ...@@ -55,9 +55,9 @@ typedef struct SQueryProfileSummary {
* @param pJob * @param pJob
* @return * @return
*/ */
int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob); int32_t scheduleQueryJob(void *pRpc, SQueryDag* pDag, void** pJob);
int32_t scheduleFetchRows(void *pJob, void *data); int32_t scheduleFetchRows(void *pRpc, void *pJob, void *data);
/** /**
...@@ -65,7 +65,7 @@ int32_t scheduleFetchRows(void *pJob, void *data); ...@@ -65,7 +65,7 @@ int32_t scheduleFetchRows(void *pJob, void *data);
* @param pJob * @param pJob
* @return * @return
*/ */
int32_t scheduleCancelJob(void *pJob); int32_t scheduleCancelJob(void *pRpc, void *pJob);
void scheduleFreeJob(void *pJob); void scheduleFreeJob(void *pJob);
......
...@@ -508,6 +508,7 @@ int32_t* taosGetErrno(); ...@@ -508,6 +508,7 @@ int32_t* taosGetErrno();
//scheduler //scheduler
#define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501) //scheduler status error #define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501) //scheduler status error
#define TSDB_CODE_SCH_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2502) //scheduler internal error
......
...@@ -27,6 +27,7 @@ extern "C" { ...@@ -27,6 +27,7 @@ extern "C" {
#include "thash.h" #include "thash.h"
#define SCHEDULE_DEFAULT_JOB_NUMBER 1000 #define SCHEDULE_DEFAULT_JOB_NUMBER 1000
#define SCHEDULE_DEFAULT_TASK_NUMBER 1000
enum { enum {
SCH_STATUS_NOT_START = 1, SCH_STATUS_NOT_START = 1,
...@@ -43,18 +44,21 @@ typedef struct SSchedulerMgmt { ...@@ -43,18 +44,21 @@ typedef struct SSchedulerMgmt {
} SSchedulerMgmt; } SSchedulerMgmt;
typedef struct SQueryTask { typedef struct SQueryTask {
uint64_t taskId; // task id uint64_t taskId; // task id
char *msg; // operator tree SSubplan *plan; // subplan
int8_t status; // task status char *msg; // operator tree
SQueryProfileSummary summary; // task execution summary int8_t status; // task status
SEpAddr execAddr; // task actual executed node address
SQueryProfileSummary summary; // task execution summary
SArray *childern; // the datasource tasks,from which to fetch the result, element is SQueryTask*
SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask*
} SQueryTask; } SQueryTask;
typedef struct SQueryLevel { typedef struct SQueryLevel {
int32_t level;
int8_t status; int8_t status;
int32_t taskNum; int32_t taskNum;
SArray *subTasks; // Element is SQueryTask SArray *subTasks; // Element is SQueryTask
SArray *subPlans; // Element is SSubplan
} SQueryLevel; } SQueryLevel;
typedef struct SQueryJob { typedef struct SQueryJob {
...@@ -70,6 +74,7 @@ typedef struct SQueryJob { ...@@ -70,6 +74,7 @@ typedef struct SQueryJob {
#define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__) #define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__)
#define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
......
...@@ -50,6 +50,66 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_ ...@@ -50,6 +50,66 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_
*/ */
} }
int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) {
for (int32_t i = 0; i < job->levelNum; ++i) {
SQueryLevel *level = taosArrayGet(job->levels, i);
for (int32_t m = 0; m < level->taskNum; ++m) {
SQueryTask *task = taosArrayGet(level->subTasks, m);
SSubplan *plan = task->plan;
int32_t childNum = (int32_t)taosArrayGetSize(plan->pChildern);
int32_t parentNum = (int32_t)taosArrayGetSize(plan->pParents);
if (childNum > 0) {
task->childern = taosArrayInit(childNum, POINTER_BYTES);
if (NULL == task->childern) {
qError("taosArrayInit %d failed", childNum);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
for (int32_t n = 0; n < childNum; ++n) {
SSubplan *child = taosArrayGet(plan->pChildern, n);
SQueryTask *childTask = taosHashGet(planToTask, &child, POINTER_BYTES);
if (childTask) {
qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
if (NULL == taosArrayPush(task->childern, &childTask)) {
qError("taosArrayPush failed");
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
if (parentNum > 0) {
task->parents = taosArrayInit(parentNum, POINTER_BYTES);
if (NULL == task->parents) {
qError("taosArrayInit %d failed", parentNum);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
for (int32_t n = 0; n < parentNum; ++n) {
SSubplan *parent = taosArrayGet(plan->pParents, n);
SQueryTask *parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES);
if (parentTask) {
qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
if (NULL == taosArrayPush(task->parents, &parentTask)) {
qError("taosArrayPush failed");
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
}
}
return TSDB_CODE_SUCCESS;
}
int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
int32_t code = 0; int32_t code = 0;
int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans); int32_t levelNum = (int32_t)taosArrayGetSize(dag->pSubplans);
...@@ -58,10 +118,16 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { ...@@ -58,10 +118,16 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
SHashObj *planToTask = taosHashInit(SCHEDULE_DEFAULT_TASK_NUMBER, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (NULL == planToTask) {
qError("taosHashInit %d failed", SCHEDULE_DEFAULT_TASK_NUMBER);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
job->levels = taosArrayInit(levelNum, sizeof(SQueryLevel)); job->levels = taosArrayInit(levelNum, sizeof(SQueryLevel));
if (NULL == job->levels) { if (NULL == job->levels) {
qError("taosArrayInit %d failed", levelNum); qError("taosArrayInit %d failed", levelNum);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
job->levelNum = levelNum; job->levelNum = levelNum;
...@@ -77,32 +143,39 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { ...@@ -77,32 +143,39 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
level.status = SCH_STATUS_NOT_START; level.status = SCH_STATUS_NOT_START;
for (int32_t i = 0; i < levelNum; ++i) { for (int32_t i = 0; i < levelNum; ++i) {
level.level = i;
levelPlans = taosArrayGetP(dag->pSubplans, i); levelPlans = taosArrayGetP(dag->pSubplans, i);
if (NULL == levelPlans) { if (NULL == levelPlans) {
qError("no level plans for level %d", i); qError("no level plans for level %d", i);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
} }
levelPlanNum = (int32_t)taosArrayGetSize(levelPlans); levelPlanNum = (int32_t)taosArrayGetSize(levelPlans);
if (levelPlanNum <= 0) { if (levelPlanNum <= 0) {
qError("invalid level plans number:%d, level:%d", levelPlanNum, i); qError("invalid level plans number:%d, level:%d", levelPlanNum, i);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
} }
level.taskNum = levelPlanNum; level.taskNum = levelPlanNum;
level.subPlans = levelPlans;
level.subTasks = taosArrayInit(levelPlanNum, sizeof(SQueryTask)); level.subTasks = taosArrayInit(levelPlanNum, sizeof(SQueryTask));
if (NULL == level.subTasks) { if (NULL == level.subTasks) {
qError("taosArrayInit %d failed", levelPlanNum); qError("taosArrayInit %d failed", levelPlanNum);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
for (int32_t n = 0; n < levelPlanNum; ++n) { for (int32_t n = 0; n < levelPlanNum; ++n) {
SSubplan *plan = taosArrayGet(levelPlans, n);
SQueryTask *task = taosArrayGet(level.subTasks, n); SQueryTask *task = taosArrayGet(level.subTasks, n);
task->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); task->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
task->plan = plan;
task->status = SCH_STATUS_NOT_START; task->status = SCH_STATUS_NOT_START;
if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &task, POINTER_BYTES)) {
qError("taosHashPut failed");
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
} }
if (NULL == taosArrayPush(job->levels, &level)) { if (NULL == taosArrayPush(job->levels, &level)) {
...@@ -111,6 +184,12 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { ...@@ -111,6 +184,12 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
} }
} }
SCH_ERR_JRET(schBuildTaskRalation(job, planToTask));
if (planToTask) {
taosHashCleanup(planToTask);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
...@@ -118,20 +197,64 @@ _return: ...@@ -118,20 +197,64 @@ _return:
taosArrayDestroy(level.subTasks); taosArrayDestroy(level.subTasks);
} }
if (planToTask) {
taosHashCleanup(planToTask);
}
SCH_RET(code); SCH_RET(code);
} }
int32_t schAvailableEpSet(SEpSet *epSet) {
int32_t schJobExecute(SQueryJob *job) { }
switch (job->status) {
case SCH_STATUS_NOT_START:
int32_t schAsyncLaunchTask(SQueryJob *job, SQueryTask *task) {
}
int32_t schTaskRun(SQueryJob *job, SQueryTask *task) {
SSubplan *plan = task->plan;
switch (task->status) {
case SCH_STATUS_NOT_START:
SCH_ERR_RET(qSubPlanToString(plan, &task->msg));
if (plan->execEpSet.numOfEps <= 0) {
SCH_ERR_RET(schAvailableEpSet(&plan->execEpSet));
}
SCH_ERR_RET(schAsyncLaunchTask(job, task));
break;
case SCH_STATUS_EXECUTING:
break;
case SCH_STATUS_SUCCEED:
break; break;
default: default:
SCH_JOB_ERR_LOG("invalid job status:%d", job->status); SCH_JOB_ERR_LOG("invalid level status:%d, levelIdx:%d", job->status, job->levelIdx);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
} }
return TSDB_CODE_SUCCESS;
}
int32_t schJobRun(SQueryJob *job) {
bool cont = true;
while (cont) {
switch (job->status) {
case SCH_STATUS_NOT_START:
case SCH_STATUS_EXECUTING:
break;
default:
SCH_JOB_ERR_LOG("invalid job status:%d", job->status);
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
}
return TSDB_CODE_SUCCESS;
} }
...@@ -145,7 +268,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { ...@@ -145,7 +268,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
} }
int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob) { int32_t scheduleQueryJob(void *pRpc, SQueryDag* pDag, void** pJob) {
if (NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { if (NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
...@@ -158,7 +281,7 @@ int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob) { ...@@ -158,7 +281,7 @@ int32_t scheduleQueryJob(SQueryDag* pDag, void** pJob) {
SCH_ERR_JRET(schValidateAndBuildJob(pDag, job)); SCH_ERR_JRET(schValidateAndBuildJob(pDag, job));
SCH_ERR_JRET(schJobExecute(job)); SCH_ERR_JRET(schJobRun(job));
*(SQueryJob **)pJob = job; *(SQueryJob **)pJob = job;
...@@ -172,9 +295,9 @@ _return: ...@@ -172,9 +295,9 @@ _return:
SCH_RET(code); SCH_RET(code);
} }
int32_t scheduleFetchRows(void *pJob, void *data); int32_t scheduleFetchRows(void *pRpc, void *pJob, void *data);
int32_t scheduleCancelJob(void *pJob); int32_t scheduleCancelJob(void *pRpc, void *pJob);
void scheduleFreeJob(void *pJob) { void scheduleFreeJob(void *pJob) {
......
...@@ -503,6 +503,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error" ...@@ -503,6 +503,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error"
//scheduler //scheduler
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status error") TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status error")
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_INTERNAL_ERROR, "scheduler internal error")
#ifdef TAOS_ERROR_C #ifdef TAOS_ERROR_C
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册