From 536e3aeb9db32a654cb0172687111d8d420ef317 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 27 Dec 2021 18:42:33 +0800 Subject: [PATCH] feature/qnode --- include/common/tmsg.h | 12 +- source/dnode/vnode/impl/src/vnodeQuery.c | 20 +- source/libs/qworker/inc/qworkerInt.h | 28 +- source/libs/qworker/src/qworker.c | 553 ++++++++++++---------- source/libs/qworker/test/qworkerTests.cpp | 35 +- source/libs/scheduler/inc/schedulerInt.h | 2 +- source/libs/scheduler/src/scheduler.c | 10 +- 7 files changed, 389 insertions(+), 271 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 92b453bc1f..a49ce5f6ce 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1002,7 +1002,7 @@ typedef struct { } SUpdateTagValRsp; typedef struct SSubQueryMsg { - uint64_t schedulerId; + uint64_t sId; uint64_t queryId; uint64_t taskId; uint32_t contentLen; @@ -1010,7 +1010,7 @@ typedef struct SSubQueryMsg { } SSubQueryMsg; typedef struct SResReadyMsg { - uint64_t schedulerId; + uint64_t sId; uint64_t queryId; uint64_t taskId; } SResReadyMsg; @@ -1020,13 +1020,13 @@ typedef struct SResReadyRsp { } SResReadyRsp; typedef struct SResFetchMsg { - uint64_t schedulerId; + uint64_t sId; uint64_t queryId; uint64_t taskId; } SResFetchMsg; typedef struct SSchTasksStatusMsg { - uint64_t schedulerId; + uint64_t sId; } SSchTasksStatusMsg; typedef struct STaskStatus { @@ -1041,7 +1041,7 @@ typedef struct SSchedulerStatusRsp { } SSchedulerStatusRsp; typedef struct STaskCancelMsg { - uint64_t schedulerId; + uint64_t sId; uint64_t queryId; uint64_t taskId; } STaskCancelMsg; @@ -1051,7 +1051,7 @@ typedef struct STaskCancelRsp { } STaskCancelRsp; typedef struct STaskDropMsg { - uint64_t schedulerId; + uint64_t sId; uint64_t queryId; uint64_t taskId; } STaskDropMsg; diff --git a/source/dnode/vnode/impl/src/vnodeQuery.c b/source/dnode/vnode/impl/src/vnodeQuery.c index 31481bf7c4..cc743d658e 100644 --- a/source/dnode/vnode/impl/src/vnodeQuery.c +++ b/source/dnode/vnode/impl/src/vnodeQuery.c @@ -22,13 +22,27 @@ int vnodeQueryOpen(SVnode *pVnode) { int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { vInfo("query message is processed"); - qWorkerProcessQueryMsg(pVnode, pVnode->pQuery, pMsg); - return 0; + return qWorkerProcessQueryMsg(pVnode, pVnode->pQuery, pMsg); } int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { vInfo("fetch message is processed"); - qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg); + switch (pMsg->msgType) { + case TDMT_VND_FETCH: + return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg); + case TDMT_VND_RES_READY: + return qWorkerProcessReadyMsg(pVnode, pVnode->pQuery, pMsg); + case TDMT_VND_TASKS_STATUS: + return qWorkerProcessStatusMsg(pVnode, pVnode->pQuery, pMsg); + case TDMT_VND_CANCEL_TASK: + return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg); + case TDMT_VND_DROP_TASK: + return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg); + default: + vError("unknown msg type:%d in fetch queue", pMsg->msgType); + return TSDB_CODE_VND_APP_ERROR; + break; + } return 0; } diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 6f454e2f81..825e891c8b 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -42,25 +42,41 @@ enum { QW_WRITE, }; -typedef struct SQWorkerTaskStatus { +enum { + QW_EXIST_ACQUIRE = 1, + QW_EXIST_RET_ERR, +}; + +enum { + QW_NOT_EXIST_RET_ERR = 1, + QW_NOT_EXIST_ADD, +}; + +enum { + QW_ADD_RET_ERR = 1, + QW_ADD_ACQUIRE, +}; + + +typedef struct SQWTaskStatus { SRWLatch lock; int32_t code; int8_t status; int8_t ready; bool cancel; bool drop; -} SQWorkerTaskStatus; +} SQWTaskStatus; typedef struct SQWorkerResCache { SRWLatch lock; void *data; } SQWorkerResCache; -typedef struct SQWorkerSchStatus { +typedef struct SQWSchStatus { int32_t lastAccessTs; // timestamp in second SRWLatch tasksLock; SHashObj *tasksHash; // key:queryId+taskId, value: SQWorkerTaskStatus -} SQWorkerSchStatus; +} SQWSchStatus; // Qnode/Vnode level task management typedef struct SQWorkerMgmt { @@ -71,7 +87,7 @@ typedef struct SQWorkerMgmt { SHashObj *resHash; //key: queryId+taskId, value: SQWorkerResCache } SQWorkerMgmt; -#define QW_GOT_RES_DATA(data) (false) +#define QW_GOT_RES_DATA(data) (true) #define QW_LOW_RES_DATA(data) (false) #define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code)) @@ -89,6 +105,8 @@ typedef struct SQWorkerMgmt { #define QW_LOCK(type, _lock) (QW_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock)) #define QW_UNLOCK(type, _lock) (QW_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock)) +static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch, int32_t nOpt); + #ifdef __cplusplus } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 149f46273c..7a861a0b8f 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -4,38 +4,42 @@ #include "qworkerInt.h" #include "planner.h" -int32_t qwCheckStatusSwitch(int8_t oriStatus, int8_t newStatus) { +int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) { int32_t code = 0; if (oriStatus == newStatus) { - if (newStatus == JOB_TASK_STATUS_CANCELLING) { - return TSDB_CODE_SUCCESS; - } - QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } switch (oriStatus) { case JOB_TASK_STATUS_NULL: - if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_FAILED ) { + if (newStatus != JOB_TASK_STATUS_EXECUTING + && newStatus != JOB_TASK_STATUS_FAILED + && newStatus != JOB_TASK_STATUS_NOT_START) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } break; case JOB_TASK_STATUS_NOT_START: - if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_FAILED) { + if (newStatus != JOB_TASK_STATUS_CANCELLED) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } break; case JOB_TASK_STATUS_EXECUTING: - if (newStatus != JOB_TASK_STATUS_SUCCEED && newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_CANCELLING) { + if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED + && newStatus != JOB_TASK_STATUS_FAILED + && newStatus != JOB_TASK_STATUS_CANCELLING + && newStatus != JOB_TASK_STATUS_CANCELLED + && newStatus != JOB_TASK_STATUS_DROPPING) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } break; case JOB_TASK_STATUS_PARTIAL_SUCCEED: - if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_CANCELLING) { + if (newStatus != JOB_TASK_STATUS_EXECUTING + && newStatus != JOB_TASK_STATUS_SUCCEED + && newStatus != JOB_TASK_STATUS_CANCELLED) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } @@ -49,6 +53,10 @@ int32_t qwCheckStatusSwitch(int8_t oriStatus, int8_t newStatus) { break; case JOB_TASK_STATUS_CANCELLED: + case JOB_TASK_STATUS_DROPPING: + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + break; + default: qError("invalid task status:%d", oriStatus); return TSDB_CODE_QRY_APP_ERROR; @@ -58,17 +66,17 @@ int32_t qwCheckStatusSwitch(int8_t oriStatus, int8_t newStatus) { _return: - qError("invalid task status:%d", oriStatus); + qError("invalid task status, from %d to %d", oriStatus, newStatus); QW_ERR_RET(code); } -int32_t qwUpdateTaskInfo(SQWorkerTaskStatus *task, int8_t type, void *data) { +int32_t qwUpdateTaskInfo(SQWTaskStatus *task, int8_t type, void *data) { int32_t code = 0; switch (type) { case QW_TASK_INFO_STATUS: { int8_t newStatus = *(int8_t *)data; - QW_ERR_RET(qwCheckStatusSwitch(task->status, newStatus)); + QW_ERR_RET(qwValidateStatus(task->status, newStatus)); task->status = newStatus; break; } @@ -80,9 +88,9 @@ int32_t qwUpdateTaskInfo(SQWorkerTaskStatus *task, int8_t type, void *data) { return TSDB_CODE_SUCCESS; } -int32_t qwAddTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, void *data) { - char id[sizeof(queryId) + sizeof(taskId)] = {0}; - QW_SET_QTID(id, queryId, taskId); +int32_t qwAddTaskResCache(SQWorkerMgmt *mgmt, uint64_t qId, uint64_t tId, void *data) { + char id[sizeof(qId) + sizeof(tId)] = {0}; + QW_SET_QTID(id, qId, tId); SQWorkerResCache resCache = {0}; resCache.data = data; @@ -90,7 +98,7 @@ int32_t qwAddTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, v QW_LOCK(QW_WRITE, &mgmt->resLock); if (0 != taosHashPut(mgmt->resHash, id, sizeof(id), &resCache, sizeof(SQWorkerResCache))) { QW_UNLOCK(QW_WRITE, &mgmt->resLock); - qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to resHash failed", queryId, taskId); + qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to resHash failed", qId, tId); return TSDB_CODE_QRY_APP_ERROR; } @@ -99,37 +107,8 @@ int32_t qwAddTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, v return TSDB_CODE_SUCCESS; } - -int32_t qwGetTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, void **data) { - char id[sizeof(queryId) + sizeof(taskId)] = {0}; - QW_SET_QTID(id, queryId, taskId); - - SQWorkerResCache *resCache = taosHashGet(mgmt->resHash, id, sizeof(id)); - if (NULL == resCache) { - qError("no task res for queryId[%"PRIx64"] taskId[%"PRIx64"]", queryId, taskId); - return TSDB_CODE_QRY_APP_ERROR; - } - - *data = resCache->data; - - return TSDB_CODE_SUCCESS; -} - - -static FORCE_INLINE int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t schedulerId, SQWorkerSchStatus **sch) { - QW_LOCK(rwType, &mgmt->schLock); - *sch = taosHashGet(mgmt->schHash, &schedulerId, sizeof(schedulerId)); - if (NULL == (*sch)) { - QW_LOCK(rwType, &mgmt->schLock); - return TSDB_CODE_QRY_SCH_NOT_EXIST; - } - - return TSDB_CODE_SUCCESS; -} - - -static FORCE_INLINE int32_t qwInsertAndAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t schedulerId, SQWorkerSchStatus **sch) { - SQWorkerSchStatus newSch = {0}; +static int32_t qwAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) { + SQWSchStatus newSch = {0}; newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); if (NULL == newSch.tasksHash) { qError("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum); @@ -138,19 +117,18 @@ static FORCE_INLINE int32_t qwInsertAndAcquireScheduler(int32_t rwType, SQWorker while (true) { QW_LOCK(QW_WRITE, &mgmt->schLock); - int32_t code = taosHashPut(mgmt->schHash, &schedulerId, sizeof(schedulerId), &newSch, sizeof(newSch)); + int32_t code = taosHashPut(mgmt->schHash, &sId, sizeof(sId), &newSch, sizeof(newSch)); if (0 != code) { if (!HASH_NODE_EXIST(code)) { QW_UNLOCK(QW_WRITE, &mgmt->schLock); - qError("taosHashPut schedulerId[%"PRIx64"] to scheduleHash failed", schedulerId); + qError("taosHashPut sId[%"PRIx64"] to scheduleHash failed", sId); taosHashCleanup(newSch.tasksHash); return TSDB_CODE_QRY_APP_ERROR; } } QW_UNLOCK(QW_WRITE, &mgmt->schLock); - if (TSDB_CODE_SUCCESS == qwAcquireScheduler(rwType, mgmt, schedulerId, sch)) { - taosHashCleanup(newSch.tasksHash); + if (TSDB_CODE_SUCCESS == qwAcquireScheduler(rwType, mgmt, sId, sch, QW_NOT_EXIST_ADD)) { return TSDB_CODE_SUCCESS; } } @@ -159,63 +137,122 @@ static FORCE_INLINE int32_t qwInsertAndAcquireScheduler(int32_t rwType, SQWorker } +static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch, int32_t nOpt) { + QW_LOCK(rwType, &mgmt->schLock); + *sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId)); + if (NULL == (*sch)) { + QW_UNLOCK(rwType, &mgmt->schLock); + + if (QW_NOT_EXIST_ADD == nOpt) { + return qwAddScheduler(rwType, mgmt, sId, sch); + } else if (QW_NOT_EXIST_RET_ERR == nOpt) { + return TSDB_CODE_QRY_SCH_NOT_EXIST; + } else { + assert(0); + } + } + + return TSDB_CODE_SUCCESS; +} + + + static FORCE_INLINE void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) { QW_UNLOCK(rwType, &mgmt->schLock); } -static FORCE_INLINE int32_t qwAcquireTask(int32_t rwType, SQWorkerSchStatus *sch, uint64_t queryId, uint64_t taskId, SQWorkerTaskStatus **task) { - char id[sizeof(queryId) + sizeof(taskId)] = {0}; - QW_SET_QTID(id, queryId, taskId); +static int32_t qwAcquireTaskImpl(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, SQWTaskStatus **task) { + char id[sizeof(qId) + sizeof(tId)] = {0}; + QW_SET_QTID(id, qId, tId); QW_LOCK(rwType, &sch->tasksLock); *task = taosHashGet(sch->tasksHash, id, sizeof(id)); if (NULL == (*task)) { QW_UNLOCK(rwType, &sch->tasksLock); + return TSDB_CODE_QRY_TASK_NOT_EXIST; } return TSDB_CODE_SUCCESS; } -static FORCE_INLINE int32_t qwInsertAndAcquireTask(int32_t rwType, SQWorkerSchStatus *sch, uint64_t queryId, uint64_t taskId, int8_t status, bool *inserted, SQWorkerTaskStatus **task) { - char id[sizeof(queryId) + sizeof(taskId)] = {0}; - QW_SET_QTID(id, queryId, taskId); +static int32_t qwAcquireTask(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, SQWTaskStatus **task) { + return qwAcquireTaskImpl(rwType, sch, qId, tId, task); +} + + +static FORCE_INLINE void qwReleaseTask(int32_t rwType, SQWSchStatus *sch) { + QW_UNLOCK(rwType, &sch->tasksLock); +} + + +int32_t qwAddTaskToSch(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, int8_t status, int32_t eOpt, SQWTaskStatus **task) { + int32_t code = 0; + + char id[sizeof(qId) + sizeof(tId)] = {0}; + QW_SET_QTID(id, qId, tId); + + SQWTaskStatus ntask = {0}; + ntask.status = status; while (true) { - *inserted = false; - QW_LOCK(QW_WRITE, &sch->tasksLock); - int32_t code = taosHashPut(sch->tasksHash, id, sizeof(id), &status, sizeof(status)); + int32_t code = taosHashPut(sch->tasksHash, id, sizeof(id), &ntask, sizeof(ntask)); if (0 != code) { QW_UNLOCK(QW_WRITE, &sch->tasksLock); if (HASH_NODE_EXIST(code)) { - if (qwAcquireTask(rwType, sch, queryId, taskId, task)) { - continue; + if (QW_EXIST_ACQUIRE == eOpt && rwType && task) { + if (qwAcquireTask(rwType, sch, qId, tId, task)) { + continue; + } + } else if (QW_EXIST_RET_ERR == eOpt) { + return TSDB_CODE_QRY_TASK_ALREADY_EXIST; + } else { + assert(0); } break; } else { - qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", queryId, taskId); + qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", qId, tId); return TSDB_CODE_QRY_APP_ERROR; } } + QW_UNLOCK(QW_WRITE, &sch->tasksLock); - *inserted = true; - - if (TSDB_CODE_SUCCESS == qwAcquireTask(rwType, sch, queryId, taskId, task)) { - return TSDB_CODE_SUCCESS; + if (rwType && task) { + if (TSDB_CODE_SUCCESS == qwAcquireTask(rwType, sch, qId, tId, task)) { + return TSDB_CODE_SUCCESS; + } + } else { + break; } - } + } return TSDB_CODE_SUCCESS; } -static FORCE_INLINE void qwReleaseTask(int32_t rwType, SQWorkerSchStatus *sch) { - QW_UNLOCK(rwType, &sch->tasksLock); +static int32_t qwAddTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t status, int32_t eOpt, SQWSchStatus **sch, SQWTaskStatus **task) { + SQWSchStatus *tsch = NULL; + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &tsch, QW_NOT_EXIST_ADD)); + + int32_t code = qwAddTaskToSch(QW_READ, tsch, qId, tId, status, eOpt, task); + if (code) { + qwReleaseScheduler(QW_WRITE, mgmt); + } + + if (NULL == task) { + qwReleaseScheduler(QW_READ, mgmt); + } else if (sch) { + *sch = tsch; + } + + QW_RET(code); } + + static FORCE_INLINE int32_t qwAcquireTaskResCache(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, SQWorkerResCache **res) { char id[sizeof(queryId) + sizeof(taskId)] = {0}; QW_SET_QTID(id, queryId, taskId); @@ -235,27 +272,24 @@ static FORCE_INLINE void qwReleaseTaskResCache(int32_t rwType, SQWorkerMgmt *mgm } -int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, SSchedulerStatusRsp **rsp) { - SQWorkerSchStatus *schStatus = NULL; +int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp) { + SQWSchStatus *sch = NULL; int32_t taskNum = 0; - if (qwAcquireScheduler(QW_READ, mgmt, schedulerId, &schStatus)) { - qWarn("no scheduler for schedulerId[%"PRIx64"]", schedulerId); - } else { - schStatus->lastAccessTs = taosGetTimestampSec(); + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); + + sch->lastAccessTs = taosGetTimestampSec(); - QW_LOCK(QW_READ, &schStatus->tasksLock); - taskNum = taosHashGetSize(schStatus->tasksHash); - } + QW_LOCK(QW_READ, &sch->tasksLock); + + taskNum = taosHashGetSize(sch->tasksHash); int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum; *rsp = calloc(1, size); if (NULL == *rsp) { qError("calloc %d failed", size); - if (schStatus) { - QW_UNLOCK(QW_READ, &schStatus->tasksLock); - qwReleaseScheduler(QW_READ, mgmt); - } + QW_UNLOCK(QW_READ, &sch->tasksLock); + qwReleaseScheduler(QW_READ, mgmt); return TSDB_CODE_QRY_OUT_OF_MEMORY; } @@ -264,23 +298,19 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, SScheduler size_t keyLen = 0; int32_t i = 0; - if (schStatus) { - void *pIter = taosHashIterate(schStatus->tasksHash, NULL); - while (pIter) { - SQWorkerTaskStatus *taskStatus = (SQWorkerTaskStatus *)pIter; - taosHashGetKey(pIter, &key, &keyLen); + void *pIter = taosHashIterate(sch->tasksHash, NULL); + while (pIter) { + SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter; + taosHashGetKey(pIter, &key, &keyLen); - QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId); - (*rsp)->status[i].status = taskStatus->status; - - pIter = taosHashIterate(schStatus->tasksHash, pIter); - } - } + QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId); + (*rsp)->status[i].status = taskStatus->status; + + pIter = taosHashIterate(sch->tasksHash, pIter); + } - if (schStatus) { - QW_UNLOCK(QW_READ, &schStatus->tasksLock); - qwReleaseScheduler(QW_READ, mgmt); - } + QW_UNLOCK(QW_READ, &sch->tasksLock); + qwReleaseScheduler(QW_READ, mgmt); (*rsp)->num = taskNum; @@ -289,115 +319,81 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, SScheduler -int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t schedulerId) { - SQWorkerSchStatus *schStatus = NULL; +int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t sId) { + SQWSchStatus *sch = NULL; - QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &schStatus)); + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); - schStatus->lastAccessTs = taosGetTimestampSec(); + sch->lastAccessTs = taosGetTimestampSec(); qwReleaseScheduler(QW_READ, mgmt); return TSDB_CODE_SUCCESS; } - -int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, int8_t *taskStatus) { - SQWorkerSchStatus *sch = NULL; - SQWorkerTaskStatus *task = NULL; +int32_t qwUpdateTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t status) { + SQWSchStatus *sch = NULL; + SQWTaskStatus *task = NULL; int32_t code = 0; - - QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); - QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); - *taskStatus = task->status; + QW_ERR_JRET(qwAcquireTask(QW_READ, sch, qId, tId, &task)); + QW_LOCK(QW_WRITE, &task->lock); + qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status); + QW_UNLOCK(QW_WRITE, &task->lock); + _return: - if (task) { - qwReleaseTask(QW_READ, sch); - } - if (sch) { - qwReleaseScheduler(QW_READ, mgmt); - } + qwReleaseTask(QW_READ, sch); + qwReleaseScheduler(QW_READ, mgmt); QW_RET(code); } -int32_t qwSwitchTaskStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, int8_t taskStatus) { - SQWorkerSchStatus *sch = NULL; - SQWorkerTaskStatus *task = NULL; +int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, int8_t *taskStatus) { + SQWSchStatus *sch = NULL; + SQWTaskStatus *task = NULL; int32_t code = 0; - bool inserted = false; - if (qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)) { - if (qwCheckStatusSwitch(JOB_TASK_STATUS_NULL, taskStatus)) { - qError("switch status error, not start to %d", taskStatus); - QW_ERR_RET(TSDB_CODE_QRY_APP_ERROR); - } - - QW_ERR_RET(qwInsertAndAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); + if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)) { + *taskStatus = JOB_TASK_STATUS_NULL; + return TSDB_CODE_SUCCESS; } if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) { - if (qwCheckStatusSwitch(JOB_TASK_STATUS_NOT_START, taskStatus)) { - qwReleaseScheduler(QW_READ, mgmt); - qError("switch status error, not start to %d", taskStatus); - QW_ERR_RET(TSDB_CODE_QRY_APP_ERROR); - } - - QW_ERR_JRET(qwInsertAndAcquireTask(QW_READ, sch, queryId, taskId, taskStatus, &inserted, &task)); - - if (inserted) { - qwReleaseTask(QW_READ, sch); - qwReleaseScheduler(QW_READ, mgmt); - return TSDB_CODE_SUCCESS; - } - - QW_LOCK(QW_WRITE, &task->lock); - code = qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &taskStatus); - QW_UNLOCK(QW_WRITE, &task->lock); - - qwReleaseTask(QW_READ, sch); - qwReleaseScheduler(QW_READ, mgmt); + qwReleaseScheduler(QW_READ, mgmt); - QW_RET(code); + *taskStatus = JOB_TASK_STATUS_NULL; + return TSDB_CODE_SUCCESS; } - QW_LOCK(QW_WRITE, &task->lock); - code = qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &taskStatus); - QW_UNLOCK(QW_WRITE, &task->lock); - -_return: + *taskStatus = task->status; qwReleaseTask(QW_READ, sch); - qwReleaseScheduler(QW_READ, mgmt); - + qwReleaseScheduler(QW_READ, mgmt); + QW_RET(code); } -int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId) { - SQWorkerSchStatus *sch = NULL; - SQWorkerTaskStatus *task = NULL; +int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId) { + SQWSchStatus *sch = NULL; + SQWTaskStatus *task = NULL; int32_t code = 0; - if (TSDB_CODE_SUCCESS != qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)) { - QW_ERR_RET(qwSwitchTaskStatus(mgmt, schedulerId, queryId, taskId, JOB_TASK_STATUS_NOT_START)); - - QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); - } + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_ADD)); if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) { - code = qwSwitchTaskStatus(mgmt, schedulerId, queryId, taskId, JOB_TASK_STATUS_NOT_START); + qwReleaseScheduler(QW_READ, mgmt); + + code = qwAddTask(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_NOT_START, QW_EXIST_ACQUIRE, &sch, &task); if (code) { qwReleaseScheduler(QW_READ, mgmt); QW_ERR_RET(code); } - - QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); } QW_LOCK(QW_WRITE, &task->lock); @@ -423,6 +419,7 @@ int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, } QW_UNLOCK(QW_WRITE, &task->lock); + qwReleaseTask(QW_READ, sch); qwReleaseScheduler(QW_READ, mgmt); @@ -449,9 +446,9 @@ _return: -int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId) { - SQWorkerSchStatus *sch = NULL; - SQWorkerTaskStatus *task = NULL; +int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId) { + SQWSchStatus *sch = NULL; + SQWTaskStatus *task = NULL; int32_t code = 0; char id[sizeof(queryId) + sizeof(taskId)] = {0}; QW_SET_QTID(id, queryId, taskId); @@ -462,15 +459,15 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, u } QW_UNLOCK(QW_WRITE, &mgmt->resLock); - if (TSDB_CODE_SUCCESS != qwAcquireScheduler(QW_WRITE, mgmt, schedulerId, &sch)) { - qWarn("scheduler %"PRIx64" doesn't exist", schedulerId); + if (TSDB_CODE_SUCCESS != qwAcquireScheduler(QW_WRITE, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)) { + qWarn("scheduler %"PRIx64" doesn't exist", sId); return TSDB_CODE_SUCCESS; } if (qwAcquireTask(QW_WRITE, sch, queryId, taskId, &task)) { qwReleaseScheduler(QW_WRITE, mgmt); - qWarn("scheduler %"PRIx64" queryId %"PRIx64" taskId:%"PRIx64" doesn't exist", schedulerId, queryId, taskId); + qWarn("scheduler %"PRIx64" queryId %"PRIx64" taskId:%"PRIx64" doesn't exist", sId, queryId, taskId); return TSDB_CODE_SUCCESS; } @@ -483,21 +480,21 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, u } -int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId) { - SQWorkerSchStatus *sch = NULL; - SQWorkerTaskStatus *task = NULL; +int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId) { + SQWSchStatus *sch = NULL; + SQWTaskStatus *task = NULL; int32_t code = 0; - if (TSDB_CODE_SUCCESS != qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)) { - qWarn("scheduler %"PRIx64" doesn't exist", schedulerId); - return TSDB_CODE_SUCCESS; - } + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_ADD)); if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) { qwReleaseScheduler(QW_READ, mgmt); - qWarn("scheduler %"PRIx64" queryId %"PRIx64" taskId:%"PRIx64" doesn't exist", schedulerId, queryId, taskId); - return TSDB_CODE_SUCCESS; + code = qwAddTask(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_NOT_START, QW_EXIST_ACQUIRE, &sch, &task); + if (code) { + qwReleaseScheduler(QW_READ, mgmt); + QW_ERR_RET(code); + } } QW_LOCK(QW_WRITE, &task->lock); @@ -508,7 +505,7 @@ int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t quer int8_t newStatus = 0; if (task->status == JOB_TASK_STATUS_EXECUTING) { - newStatus = JOB_TASK_STATUS_CANCELLING; + newStatus = JOB_TASK_STATUS_DROPPING; QW_ERR_JRET(qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus)); } else if (task->status == JOB_TASK_STATUS_CANCELLING || task->status == JOB_TASK_STATUS_DROPPING || task->status == JOB_TASK_STATUS_NOT_START) { QW_UNLOCK(QW_WRITE, &task->lock); @@ -521,7 +518,7 @@ int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t quer qwReleaseTask(QW_READ, sch); qwReleaseScheduler(QW_READ, mgmt); - QW_ERR_RET(qwDropTask(mgmt, schedulerId, queryId, taskId)); + QW_ERR_RET(qwDropTask(mgmt, sId, queryId, taskId)); return TSDB_CODE_SUCCESS; } @@ -673,12 +670,12 @@ int32_t qwBuildAndSendDropRsp(SRpcMsg *pMsg, int32_t code) { -int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg, int32_t rspCode) { - SQWorkerSchStatus *sch = NULL; - SQWorkerTaskStatus *task = NULL; +int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg, int32_t rspCode) { + SQWSchStatus *sch = NULL; + SQWTaskStatus *task = NULL; int32_t code = 0; - QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); @@ -717,12 +714,12 @@ _return: QW_RET(code); } -int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { - SQWorkerSchStatus *sch = NULL; - SQWorkerTaskStatus *task = NULL; +int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { + SQWSchStatus *sch = NULL; + SQWTaskStatus *task = NULL; int32_t code = 0; - QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); @@ -756,15 +753,15 @@ _return: QW_RET(code); } -int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, bool *needStop) { - SQWorkerSchStatus *sch = NULL; - SQWorkerTaskStatus *task = NULL; +int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, bool *needStop) { + SQWSchStatus *sch = NULL; + SQWTaskStatus *task = NULL; int32_t code = 0; int8_t status = JOB_TASK_STATUS_CANCELLED; *needStop = false; - if (qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)) { + if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)) { return TSDB_CODE_SUCCESS; } @@ -776,11 +773,13 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_ QW_LOCK(QW_READ, &task->lock); if ((!task->cancel) && (!task->drop)) { + qError("no cancel or drop, but task:%"PRIx64" exists", taskId); + QW_UNLOCK(QW_READ, &task->lock); qwReleaseTask(QW_READ, sch); qwReleaseScheduler(QW_READ, mgmt); - return TSDB_CODE_SUCCESS; + QW_RET(TSDB_CODE_QRY_APP_ERROR); } QW_UNLOCK(QW_READ, &task->lock); @@ -791,30 +790,40 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_ QW_LOCK(QW_WRITE, &task->lock); qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status); QW_UNLOCK(QW_WRITE, &task->lock); - } else if (task->drop) { + } + + if (task->drop) { qwReleaseTask(QW_READ, sch); qwReleaseScheduler(QW_READ, mgmt); - qwDropTask(mgmt, schedulerId, queryId, taskId); + return qwDropTask(mgmt, sId, queryId, taskId); } + qwReleaseTask(QW_READ, sch); + qwReleaseScheduler(QW_READ, mgmt); + return TSDB_CODE_SUCCESS; } -int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { - SQWorkerSchStatus *sch = NULL; - SQWorkerTaskStatus *task = NULL; +int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { + SQWSchStatus *sch = NULL; + SQWTaskStatus *task = NULL; int32_t code = 0; int32_t needRsp = true; void *data = NULL; - QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); + QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); QW_LOCK(QW_READ, &task->lock); - if (task->status != JOB_TASK_STATUS_EXECUTING && task->status != JOB_TASK_STATUS_PARTIAL_SUCCEED && task->status != JOB_TASK_STATUS_SUCCEED) { + if (task->cancel || task->drop) { + qError("task is already cancelled or dropped"); + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + if (task->status != JOB_TASK_STATUS_EXECUTING && task->status != JOB_TASK_STATUS_PARTIAL_SUCCEED) { qError("invalid status %d for fetch", task->status); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } @@ -854,37 +863,46 @@ _return: QW_RET(code); } -int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, int8_t status, int32_t errCode) { - SQWorkerSchStatus *sch = NULL; - SQWorkerTaskStatus *task = NULL; +int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t status, int32_t errCode) { + SQWSchStatus *sch = NULL; + SQWTaskStatus *task = NULL; int32_t code = 0; int8_t newStatus = JOB_TASK_STATUS_CANCELLED; - code = qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch); + code = qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_ADD); if (code) { - qError("schedulerId:%"PRIx64" not in cache", schedulerId); + qError("sId:%"PRIx64" not in cache", sId); QW_ERR_RET(code); } - code = qwAcquireTask(QW_READ, sch, queryId, taskId, &task); + code = qwAcquireTask(QW_READ, sch, qId, tId, &task); if (code) { qwReleaseScheduler(QW_READ, mgmt); - qError("schedulerId:%"PRIx64" queryId:%"PRIx64" taskId:%"PRIx64" not in cache", schedulerId, queryId, taskId); - QW_ERR_RET(code); + + if (JOB_TASK_STATUS_PARTIAL_SUCCEED == status || JOB_TASK_STATUS_SUCCEED == status) { + qError("sId:%"PRIx64" queryId:%"PRIx64" taskId:%"PRIx64" not in cache", sId, qId, tId); + QW_ERR_RET(code); + } + + QW_ERR_RET(qwAddTask(mgmt, sId, qId, tId, status, QW_EXIST_ACQUIRE, &sch, &task)); } if (task->cancel) { QW_LOCK(QW_WRITE, &task->lock); qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus); QW_UNLOCK(QW_WRITE, &task->lock); - } else if (task->drop) { + } + + if (task->drop) { qwReleaseTask(QW_READ, sch); qwReleaseScheduler(QW_READ, mgmt); - qwDropTask(mgmt, schedulerId, queryId, taskId); + qwDropTask(mgmt, sId, qId, tId); return TSDB_CODE_SUCCESS; - } else { + } + + if (!(task->cancel || task->drop)) { QW_LOCK(QW_WRITE, &task->lock); qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status); task->code = errCode; @@ -938,24 +956,24 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + int32_t code = 0; SSubQueryMsg *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { qError("invalid query msg"); - QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } - msg->schedulerId = htobe64(msg->schedulerId); + msg->sId = htobe64(msg->sId); msg->queryId = htobe64(msg->queryId); msg->taskId = htobe64(msg->taskId); msg->contentLen = ntohl(msg->contentLen); bool queryDone = false; - bool queryRsp = false; + bool queryRsped = false; bool needStop = false; SSubplan *plan = NULL; - int32_t code = 0; - QW_ERR_JRET(qwCheckTaskCancelDrop(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, &needStop)); + QW_ERR_JRET(qwCheckTaskCancelDrop(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, &needStop)); if (needStop) { qWarn("task need stop"); QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); @@ -963,7 +981,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { code = qStringToSubplan(msg->msg, &plan); if (TSDB_CODE_SUCCESS != code) { - qError("schId:%"PRIx64",qId:%"PRIx64",taskId:%"PRIx64" string to subplan failed, code:%d", msg->schedulerId, msg->queryId, msg->taskId, code); + qError("schId:%"PRIx64",qId:%"PRIx64",taskId:%"PRIx64" string to subplan failed, code:%d", msg->sId, msg->queryId, msg->taskId, code); QW_ERR_JRET(code); } @@ -974,12 +992,12 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { if (code) { QW_ERR_JRET(code); } else { - QW_ERR_JRET(qwSwitchTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, JOB_TASK_STATUS_EXECUTING)); + QW_ERR_JRET(qwAddTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_EXECUTING, QW_EXIST_RET_ERR, NULL, NULL)); } QW_ERR_JRET(qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_SUCCESS)); - queryRsp = true; + queryRsped = true; //TODO call executer to execute subquery code = 0; @@ -990,29 +1008,29 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { if (code) { QW_ERR_JRET(code); } else { - QW_ERR_JRET(qwAddTaskResult(qWorkerMgmt, msg->queryId, msg->taskId, data)); + QW_ERR_JRET(qwAddTaskResCache(qWorkerMgmt, msg->queryId, msg->taskId, data)); - QW_ERR_JRET(qwSwitchTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, JOB_TASK_STATUS_PARTIAL_SUCCEED)); - } + QW_ERR_JRET(qwUpdateTaskStatus(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_PARTIAL_SUCCEED)); + } _return: - if (queryRsp) { - code = qwCheckAndSendReadyRsp(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, pMsg, code); + if (queryRsped) { + code = qwCheckAndSendReadyRsp(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg, code); } else { code = qwBuildAndSendQueryRsp(pMsg, code); } int8_t status = 0; - if (TSDB_CODE_SUCCESS != code || queryDone) { - if (code) { - status = JOB_TASK_STATUS_FAILED; //TODO set CANCELLED from code - } else { - status = JOB_TASK_STATUS_SUCCEED; - } - - qwQueryPostProcess(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, status, code); + if (TSDB_CODE_SUCCESS != code) { + status = JOB_TASK_STATUS_FAILED; + } else if (queryDone) { + status = JOB_TASK_STATUS_SUCCEED; + } else { + status = JOB_TASK_STATUS_PARTIAL_SUCCEED; } + + qwQueryPostProcess(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, status, code); QW_RET(code); } @@ -1023,12 +1041,16 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ } SResReadyMsg *msg = pMsg->pCont; - if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + if (NULL == msg || pMsg->contLen < sizeof(*msg)) { qError("invalid task status msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - QW_ERR_RET(qwSetAndSendReadyRsp(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, pMsg)); + msg->sId = htobe64(msg->sId); + msg->queryId = htobe64(msg->queryId); + msg->taskId = htobe64(msg->taskId); + + QW_ERR_RET(qwSetAndSendReadyRsp(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg)); return TSDB_CODE_SUCCESS; } @@ -1040,14 +1062,16 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t code = 0; SSchTasksStatusMsg *msg = pMsg->pCont; - if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + if (NULL == msg || pMsg->contLen < sizeof(*msg)) { qError("invalid task status msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + msg->sId = htobe64(msg->sId); + SSchedulerStatusRsp *sStatus = NULL; - QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->schedulerId, &sStatus)); + QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus)); _return: @@ -1062,11 +1086,15 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { } SResFetchMsg *msg = pMsg->pCont; - if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + if (NULL == msg || pMsg->contLen < sizeof(*msg)) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - QW_ERR_RET(qwUpdateSchLastAccess(qWorkerMgmt, msg->schedulerId)); + msg->sId = htobe64(msg->sId); + msg->queryId = htobe64(msg->queryId); + msg->taskId = htobe64(msg->taskId); + + QW_ERR_RET(qwUpdateSchLastAccess(qWorkerMgmt, msg->sId)); void *data = NULL; SQWorkerResCache *res = NULL; @@ -1074,7 +1102,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { QW_ERR_RET(qwAcquireTaskResCache(QW_READ, qWorkerMgmt, msg->queryId, msg->taskId, &res)); - QW_ERR_JRET(qwHandleFetch(res, qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, pMsg)); + QW_ERR_JRET(qwHandleFetch(res, qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg)); _return: @@ -1090,12 +1118,16 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t code = 0; STaskCancelMsg *msg = pMsg->pCont; - if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + if (NULL == msg || pMsg->contLen < sizeof(*msg)) { qError("invalid task cancel msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId)); + msg->sId = htobe64(msg->sId); + msg->queryId = htobe64(msg->queryId); + msg->taskId = htobe64(msg->taskId); + + QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId)); _return: @@ -1111,12 +1143,16 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t code = 0; STaskDropMsg *msg = pMsg->pCont; - if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + if (NULL == msg || pMsg->contLen < sizeof(*msg)) { qError("invalid task drop msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - QW_ERR_JRET(qwCancelDropTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId)); + msg->sId = htobe64(msg->sId); + msg->queryId = htobe64(msg->queryId); + msg->taskId = htobe64(msg->taskId); + + QW_ERR_JRET(qwCancelDropTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId)); _return: @@ -1125,6 +1161,31 @@ _return: return TSDB_CODE_SUCCESS; } +int32_t qWorkerContinueQuery(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { + int32_t code = 0; + int8_t status = 0; + bool queryDone = false; + uint64_t sId, qId, tId; + + //TODO call executer to continue execute subquery + code = 0; + void *data = NULL; + queryDone = false; + //TODO call executer to continue execute subquery + + if (TSDB_CODE_SUCCESS != code) { + status = JOB_TASK_STATUS_FAILED; + } else if (queryDone) { + status = JOB_TASK_STATUS_SUCCEED; + } else { + status = JOB_TASK_STATUS_PARTIAL_SUCCEED; + } + + code = qwQueryPostProcess(qWorkerMgmt, sId, qId, tId, status, code); + + QW_RET(code); +} + void qWorkerDestroy(void **qWorkerMgmt) { if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) { diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 4b54b77544..d63d40b4f4 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -40,6 +40,11 @@ int32_t qwtStringToPlan(const char* str, SSubplan** subplan) { return 0; } +void qwtRpcSendResponse(const SRpcMsg *pRsp) { + return; +} + + void stubSetStringToPlan() { static Stub stub; @@ -54,6 +59,20 @@ void stubSetStringToPlan() { } } +void stubSetRpcSendResponse() { + static Stub stub; + stub.set(rpcSendResponse, qwtRpcSendResponse); + { + AddrAny any("libplanner.so"); + std::map result; + any.get_global_func_addr_dynsym("^rpcSendResponse$", result); + for (const auto& f : result) { + stub.set(f.second, qwtRpcSendResponse); + } + } +} + + } @@ -68,30 +87,35 @@ TEST(testCase, normalCase) { SRpcMsg dropRpc = {0}; SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100); queryMsg->queryId = htobe64(1); - queryMsg->schedulerId = htobe64(1); + queryMsg->sId = htobe64(1); queryMsg->taskId = htobe64(1); queryMsg->contentLen = htonl(100); queryRpc.pCont = queryMsg; + queryRpc.contLen = sizeof(SSubQueryMsg) + 100; SResReadyMsg readyMsg = {0}; - readyMsg.schedulerId = htobe64(1); + readyMsg.sId = htobe64(1); readyMsg.queryId = htobe64(1); readyMsg.taskId = htobe64(1); readyRpc.pCont = &readyMsg; + readyRpc.contLen = sizeof(SResReadyMsg); SResFetchMsg fetchMsg = {0}; - fetchMsg.schedulerId = htobe64(1); + fetchMsg.sId = htobe64(1); fetchMsg.queryId = htobe64(1); fetchMsg.taskId = htobe64(1); fetchRpc.pCont = &fetchMsg; + fetchRpc.contLen = sizeof(SResFetchMsg); STaskDropMsg dropMsg = {0}; - dropMsg.schedulerId = htobe64(1); + dropMsg.sId = htobe64(1); dropMsg.queryId = htobe64(1); dropMsg.taskId = htobe64(1); dropRpc.pCont = &dropMsg; + dropRpc.contLen = sizeof(STaskDropMsg); stubSetStringToPlan(); + stubSetRpcSendResponse(); code = qWorkerInit(NULL, &mgmt); ASSERT_EQ(code, 0); @@ -107,7 +131,8 @@ TEST(testCase, normalCase) { code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); ASSERT_EQ(code, 0); - + + qWorkerDestroy(&mgmt); } diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 2381a1dd49..d5833c3adf 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -38,7 +38,7 @@ enum { typedef struct SSchedulerMgmt { uint64_t taskId; - uint64_t schedulerId; + uint64_t sId; SSchedulerCfg cfg; SHashObj *jobs; // key: queryId, value: SQueryJob* } SSchedulerMgmt; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 503383f4b1..3cea97381f 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -343,7 +343,7 @@ int32_t schAsyncSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { SSubQueryMsg *pMsg = msg; - pMsg->schedulerId = htobe64(schMgmt.schedulerId); + pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); pMsg->contentLen = htonl(task->msgLen); @@ -359,7 +359,7 @@ int32_t schAsyncSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { } SResReadyMsg *pMsg = msg; - pMsg->schedulerId = htobe64(schMgmt.schedulerId); + pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); break; @@ -376,7 +376,7 @@ int32_t schAsyncSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { } SResFetchMsg *pMsg = msg; - pMsg->schedulerId = htobe64(schMgmt.schedulerId); + pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); break; @@ -390,7 +390,7 @@ int32_t schAsyncSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { } STaskDropMsg *pMsg = msg; - pMsg->schedulerId = htobe64(schMgmt.schedulerId); + pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); break; @@ -717,7 +717,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum); } - schMgmt.schedulerId = 1; //TODO GENERATE A UUID + schMgmt.sId = 1; //TODO GENERATE A UUID return TSDB_CODE_SUCCESS; } -- GitLab