From af96f5b5030a5f50f33d28cda39ff58547b63698 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 23 Dec 2021 20:01:51 +0800 Subject: [PATCH] feature/qnode --- include/common/taosmsg.h | 4 - source/libs/qworker/src/qworker.c | 12 ++ source/libs/scheduler/inc/schedulerInt.h | 29 +++- source/libs/scheduler/src/scheduler.c | 168 +++++++++++++++++++---- 4 files changed, 172 insertions(+), 41 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index ec51a67808..8fd1ca9e9f 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -587,10 +587,6 @@ typedef struct { typedef struct { int32_t code; - union { - uint64_t qhandle; - uint64_t qId; - }; // query handle } SQueryTableRsp; // todo: the show handle should be replaced with id diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 628077a020..6955da8a8c 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -553,7 +553,19 @@ _return: int32_t qwBuildAndSendQueryRsp(SRpcMsg *pMsg, int32_t code) { + SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp)); + pRsp->code = code; + SRpcMsg rpcRsp = { + .handle = pMsg->handle, + .pCont = pRsp, + .contLen = sizeof(*pRsp), + .code = code, + }; + + rpcSendResponse(&rpcRsp); + + return TSDB_CODE_SUCCESS; } int32_t qwBuildAndSendReadyRsp(SRpcMsg *pMsg, int32_t code) { diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 3fab91edac..f3de499dcd 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -31,17 +31,24 @@ extern "C" { #define SCH_MAX_CONDIDATE_EP_NUM TSDB_MAX_REPLICA +enum { + SCH_READ = 1, + SCH_WRITE, +}; + typedef struct SSchedulerMgmt { uint64_t taskId; uint64_t schedulerId; SSchedulerCfg cfg; - SHashObj *Jobs; // key: queryId, value: SQueryJob* + SHashObj *jobs; // key: queryId, value: SQueryJob* } SSchedulerMgmt; typedef struct SQueryTask { uint64_t taskId; // task id + SQueryLevel *level; // level SSubplan *plan; // subplan char *msg; // operator tree + int32_t msgLen; // msg length int8_t status; // task status SEpAddr execAddr; // task actual executed node address SQueryProfileSummary summary; // task execution summary @@ -51,10 +58,13 @@ typedef struct SQueryTask { } SQueryTask; typedef struct SQueryLevel { - int32_t level; - int8_t status; - int32_t taskNum; - SArray *subTasks; // Element is SQueryTask + int32_t level; + int8_t status; + SRWLatch lock; + int32_t taskFailed; + int32_t taskSucceed; + int32_t taskNum; + SArray *subTasks; // Element is SQueryTask } SQueryLevel; typedef struct SQueryJob { @@ -63,8 +73,8 @@ typedef struct SQueryJob { int32_t levelIdx; int8_t status; SQueryProfileSummary summary; - SEpSet dataSrcEps; - SEpAddr resEp; + SEpSet dataSrcEps; + SEpAddr resEp; void *transport; SArray *qnodeList; tsem_t rspSem; @@ -74,6 +84,7 @@ typedef struct SQueryJob { SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* + SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask* SArray *levels; // Element is SQueryLevel, starting from 0. SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. @@ -82,6 +93,7 @@ typedef struct SQueryJob { #define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE #define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE #define SCH_IS_DATA_SRC_TASK(task) (task->plan->type == QUERY_TYPE_SCAN) +#define SCH_TASK_NEED_WAIT_ALL(type) (task->plan->type == QUERY_TYPE_MODIFY) #define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__) #define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) @@ -91,6 +103,9 @@ typedef struct SQueryJob { #define SCH_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0) #define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) +#define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock)) +#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock)) + extern int32_t schLaunchTask(SQueryJob *job, SQueryTask *task); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 83507c8dd7..a2fbdbe924 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -160,11 +160,19 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { SQueryLevel level = {0}; SArray *levelPlans = NULL; int32_t levelPlanNum = 0; + SQueryLevel *pLevel = NULL; level.status = JOB_TASK_STATUS_NOT_START; for (int32_t i = 0; i < levelNum; ++i) { - level.level = i; + if (NULL == taosArrayPush(job->levels, &level)) { + qError("taosArrayPush failed"); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + pLevel = taosArrayGet(job->levels, i); + + pLevel->level = i; levelPlans = taosArrayGetP(dag->pSubplans, i); if (NULL == levelPlans) { qError("no level plans for level %d", i); @@ -177,10 +185,10 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } - level.taskNum = levelPlanNum; + pLevel->taskNum = levelPlanNum; - level.subTasks = taosArrayInit(levelPlanNum, sizeof(SQueryTask)); - if (NULL == level.subTasks) { + pLevel->subTasks = taosArrayInit(levelPlanNum, sizeof(SQueryTask)); + if (NULL == pLevel->subTasks) { qError("taosArrayInit %d failed", levelPlanNum); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -191,9 +199,10 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); task.plan = plan; + task.level = pLevel; task.status = JOB_TASK_STATUS_NOT_START; - void *p = taosArrayPush(level.subTasks, &task); + void *p = taosArrayPush(pLevel->subTasks, &task); if (NULL == p) { qError("taosArrayPush failed"); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -205,10 +214,6 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { } } - if (NULL == taosArrayPush(job->levels, &level)) { - qError("taosArrayPush failed"); - SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } } SCH_ERR_JRET(schBuildTaskRalation(job, planToTask)); @@ -220,8 +225,8 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { return TSDB_CODE_SUCCESS; _return: - if (level.subTasks) { - taosArrayDestroy(level.subTasks); + if (pLevel->subTasks) { + taosArrayDestroy(pLevel->subTasks); } if (planToTask) { @@ -273,7 +278,23 @@ int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) { return TSDB_CODE_SUCCESS; } - if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { + if (0 != taosHashPut(job->succTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { + qError("taosHashPut failed"); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + *moved = true; + + return TSDB_CODE_SUCCESS; +} + +int32_t schMoveTaskToFailList(SQueryJob *job, SQueryTask *task, bool *moved) { + if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { + qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId); + return TSDB_CODE_SUCCESS; + } + + if (0 != taosHashPut(job->failTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { qError("taosHashPut failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -289,14 +310,23 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { void *msg = NULL; switch (msgType) { + case TSDB_MSG_TYPE_SUBMIT: { + if (NULL == task->msg || task->msgLen <= 0) { + qError("submit msg is NULL"); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + msgSize = task->msgLen; + msg = task->msg; + break; + } case TSDB_MSG_TYPE_QUERY: { if (NULL == task->msg) { qError("query msg is NULL"); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - int32_t len = strlen(task->msg); - msgSize = sizeof(SSubQueryMsg) + len + 1; + msgSize = sizeof(SSubQueryMsg) + task->msgLen; msg = calloc(1, msgSize); if (NULL == msg) { qError("calloc %d failed", msgSize); @@ -308,11 +338,10 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { pMsg->schedulerId = htobe64(schMgmt.schedulerId); pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); - pMsg->contentLen = htonl(len); - memcpy(pMsg->msg, task->msg, len); - pMsg->msg[len] = 0; + pMsg->contentLen = htonl(task->msgLen); + memcpy(pMsg->msg, task->msg, task->msgLen); break; - } + } case TSDB_MSG_TYPE_RES_READY: { msgSize = sizeof(SResReadyMsg); msg = calloc(1, msgSize); @@ -322,6 +351,7 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { } SResReadyMsg *pMsg = msg; + pMsg->schedulerId = htobe64(schMgmt.schedulerId); pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); break; @@ -335,6 +365,21 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { } SResFetchMsg *pMsg = msg; + pMsg->schedulerId = htobe64(schMgmt.schedulerId); + pMsg->queryId = htobe64(job->queryId); + pMsg->taskId = htobe64(task->taskId); + break; + } + case TSDB_MSG_TYPE_DROP_TASK:{ + msgSize = sizeof(STaskDropMsg); + msg = calloc(1, msgSize); + if (NULL == msg) { + qError("calloc %d failed", msgSize); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + STaskDropMsg *pMsg = msg; + pMsg->schedulerId = htobe64(schMgmt.schedulerId); pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); break; @@ -345,6 +390,7 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { } //TODO SEND MSG + //taosAsyncExec(rpcSendRequest(void * shandle, const SEpSet * pEpSet, SRpcMsg * pMsg, int64_t * pRid), p, &code); return TSDB_CODE_SUCCESS; } @@ -425,8 +471,29 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - strncpy(job->resEp.fqdn, task->execAddr.fqdn, sizeof(job->resEp.fqdn)); - job->resEp.port = task->execAddr.port; + int32_t taskDone = 0; + + if (SCH_TASK_NEED_WAIT_ALL(task)) { + SCH_LOCK(SCH_WRITE, &task->level->lock); + task->level->taskFailed++; + taskDone = task->level->taskSucceed + task->level->taskFailed; + SCH_UNLOCK(SCH_WRITE, &task->level->lock); + + if (taskDone < task->level->taskNum) { + qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum); + return TSDB_CODE_SUCCESS; + } + + if (task->level->taskFailed > 0) { + job->status = JOB_TASK_STATUS_FAILED; + SCH_ERR_RET(schProcessOnJobFailure(job)); + + return TSDB_CODE_SUCCESS; + } + } else { + strncpy(job->resEp.fqdn, task->execAddr.fqdn, sizeof(job->resEp.fqdn)); + job->resEp.port = task->execAddr.port; + } SCH_ERR_RET(schProcessOnJobSuccess(job)); @@ -457,10 +524,30 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCode) { bool needRetry = false; + bool moved = false; + int32_t taskDone = 0; SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry)); if (!needRetry) { SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode); + + SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved)); + if (!moved) { + SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status); + return TSDB_CODE_SUCCESS; + } + + if (SCH_TASK_NEED_WAIT_ALL(task)) { + SCH_LOCK(SCH_WRITE, &task->level->lock); + task->level->taskFailed++; + taskDone = task->level->taskSucceed + task->level->taskFailed; + SCH_UNLOCK(SCH_WRITE, &task->level->lock); + + if (taskDone < task->level->taskNum) { + qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum); + return TSDB_CODE_SUCCESS; + } + } job->status = JOB_TASK_STATUS_FAILED; SCH_ERR_RET(schProcessOnJobFailure(job)); @@ -522,8 +609,7 @@ _return: int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) { SSubplan *plan = task->plan; - int32_t len = 0; - SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &len)); + SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen)); if (plan->execEpSet.numOfEps <= 0) { SCH_ERR_RET(schSetTaskExecEpSet(job, &plan->execEpSet)); } @@ -532,8 +618,10 @@ int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) { SCH_TASK_ERR_LOG("invalid execEpSet num:%d", plan->execEpSet.numOfEps); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } + + int32_t msgType = (plan->type == QUERY_TYPE_MODIFY) ? TSDB_MSG_TYPE_SUBMIT : TSDB_MSG_TYPE_QUERY; - SCH_ERR_RET(schAsyncSendMsg(job, task, TSDB_MSG_TYPE_QUERY)); + SCH_ERR_RET(schAsyncSendMsg(job, task, msgType)); SCH_ERR_RET(schPushTaskToExecList(job, task)); @@ -554,6 +642,16 @@ int32_t schLaunchJob(SQueryJob *job) { return TSDB_CODE_SUCCESS; } +void schDropJobAllTasks(SQueryJob *job) { + void *pIter = taosHashIterate(job->succTasks, NULL); + while (pIter) { + SQueryTask *task = *(SQueryTask **)pIter; + + schAsyncSendMsg(job, task, int32_t msgType); + + pIter = taosHashIterate(schStatus->tasksHash, pIter); + } +} int32_t schedulerInit(SSchedulerCfg *cfg) { if (cfg) { @@ -562,8 +660,8 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER; } - schMgmt.Jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); - if (NULL == schMgmt.Jobs) { + schMgmt.jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == schMgmt.jobs) { SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum); } @@ -605,9 +703,15 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + job->failTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == job->failTasks) { + qError("taosHashInit %d failed", pDag->numOfSubplans); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + tsem_init(&job->rspSem, 0, 0); - if (0 != taosHashPut(schMgmt.Jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES)) { + if (0 != taosHashPut(schMgmt.jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES)) { qError("taosHashPut queryId:%"PRIx64" failed", job->queryId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } @@ -659,6 +763,8 @@ _return: int32_t scheduleCancelJob(void *pJob) { //TODO + //TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST + return TSDB_CODE_SUCCESS; } @@ -670,7 +776,7 @@ void scheduleFreeJob(void *pJob) { SQueryJob *job = pJob; if (job->status > 0) { - if (0 != taosHashRemove(schMgmt.Jobs, &job->queryId, sizeof(job->queryId))) { + if (0 != taosHashRemove(schMgmt.jobs, &job->queryId, sizeof(job->queryId))) { qError("remove job:%"PRIx64"from mgmt failed", job->queryId); // maybe already freed return; } @@ -678,15 +784,17 @@ void scheduleFreeJob(void *pJob) { if (job->status == JOB_TASK_STATUS_EXECUTING) { scheduleCancelJob(pJob); } + + schDropJobAllTasks(job); } //TODO free job } void schedulerDestroy(void) { - if (schMgmt.Jobs) { - taosHashCleanup(schMgmt.Jobs); //TODO - schMgmt.Jobs = NULL; + if (schMgmt.jobs) { + taosHashCleanup(schMgmt.jobs); //TODO + schMgmt.jobs = NULL; } } -- GitLab