From fddd11ed470e043f85eb198d40ce6295256decda Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 24 Dec 2021 09:41:09 +0800 Subject: [PATCH] feature/qnode --- include/common/taosmsg.h | 12 ++ include/libs/qworker/qworker.h | 12 +- source/dnode/vnode/impl/CMakeLists.txt | 3 +- source/dnode/vnode/impl/inc/vnodeDef.h | 2 + source/dnode/vnode/impl/inc/vnodeQuery.h | 31 +++++ source/dnode/vnode/impl/src/vnodeInt.c | 10 -- source/dnode/vnode/impl/src/vnodeMain.c | 5 + source/dnode/vnode/impl/src/vnodeQuery.c | 35 ++++++ source/dnode/vnode/meta/inc/metaQuery.h | 6 +- source/libs/qworker/src/qworker.c | 149 +++++++++++++++++++---- source/libs/scheduler/inc/schedulerInt.h | 25 ++-- source/libs/scheduler/src/scheduler.c | 13 +- 12 files changed, 246 insertions(+), 57 deletions(-) create mode 100644 source/dnode/vnode/impl/inc/vnodeQuery.h create mode 100644 source/dnode/vnode/impl/src/vnodeQuery.c diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 8fd1ca9e9f..8cafa39d8c 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -1121,6 +1121,10 @@ typedef struct SResReadyMsg { uint64_t taskId; } SResReadyMsg; +typedef struct SResReadyRsp { + int32_t code; +} SResReadyRsp; + typedef struct SResFetchMsg { uint64_t schedulerId; uint64_t queryId; @@ -1149,12 +1153,20 @@ typedef struct STaskCancelMsg { uint64_t taskId; } STaskCancelMsg; +typedef struct STaskCancelRsp { + int32_t code; +} STaskCancelRsp; + typedef struct STaskDropMsg { uint64_t schedulerId; uint64_t queryId; uint64_t taskId; } STaskDropMsg; +typedef struct STaskDropRsp { + int32_t code; +} STaskDropRsp; + #pragma pack(pop) diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 8e36178497..83047a44de 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -42,17 +42,17 @@ typedef struct { int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt); -int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg **rsp); +int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); -int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp); +int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); -int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp); +int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); -int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp); +int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); -int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp); +int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); -int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp); +int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); void qWorkerDestroy(void **qWorkerMgmt); diff --git a/source/dnode/vnode/impl/CMakeLists.txt b/source/dnode/vnode/impl/CMakeLists.txt index 6972605afd..9e892bc4c4 100644 --- a/source/dnode/vnode/impl/CMakeLists.txt +++ b/source/dnode/vnode/impl/CMakeLists.txt @@ -15,9 +15,10 @@ target_link_libraries( PUBLIC wal PUBLIC sync PUBLIC cjson + PUBLIC qworker ) # test if(${BUILD_TEST}) add_subdirectory(test) -endif(${BUILD_TEST}) \ No newline at end of file +endif(${BUILD_TEST}) diff --git a/source/dnode/vnode/impl/inc/vnodeDef.h b/source/dnode/vnode/impl/inc/vnodeDef.h index 605557d4ea..c5a57b02a6 100644 --- a/source/dnode/vnode/impl/inc/vnodeDef.h +++ b/source/dnode/vnode/impl/inc/vnodeDef.h @@ -34,6 +34,7 @@ #include "vnodeRequest.h" #include "vnodeStateMgr.h" #include "vnodeSync.h" +#include "vnodeQuery.h" #ifdef __cplusplus extern "C" { @@ -72,6 +73,7 @@ struct SVnode { SVnodeSync* pSync; SVnodeFS* pFs; tsem_t canCommit; + void* pQuery; }; int vnodeScheduleTask(SVnodeTask* task); diff --git a/source/dnode/vnode/impl/inc/vnodeQuery.h b/source/dnode/vnode/impl/inc/vnodeQuery.h new file mode 100644 index 0000000000..59bab42f62 --- /dev/null +++ b/source/dnode/vnode/impl/inc/vnodeQuery.h @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_VNODE_READ_H_ +#define _TD_VNODE_READ_H_ + +#ifdef __cplusplus +extern "C" { +#endif +#include "vnodeInt.h" +#include "qworker.h" + +int vnodeQueryOpen(SVnode *pVnode); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_VNODE_READ_H_*/ diff --git a/source/dnode/vnode/impl/src/vnodeInt.c b/source/dnode/vnode/impl/src/vnodeInt.c index 5deaffe6d2..65185f4a16 100644 --- a/source/dnode/vnode/impl/src/vnodeInt.c +++ b/source/dnode/vnode/impl/src/vnodeInt.c @@ -24,16 +24,6 @@ int32_t vnodeSync(SVnode *pVnode) { return 0; } int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { return 0; } -int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - vInfo("query message is processed"); - return 0; -} - -int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - vInfo("fetch message is processed"); - return 0; -} - int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { vInfo("sync message is processed"); return 0; diff --git a/source/dnode/vnode/impl/src/vnodeMain.c b/source/dnode/vnode/impl/src/vnodeMain.c index c98f3e0800..2b0363c97f 100644 --- a/source/dnode/vnode/impl/src/vnodeMain.c +++ b/source/dnode/vnode/impl/src/vnodeMain.c @@ -127,6 +127,11 @@ static int vnodeOpenImpl(SVnode *pVnode) { return -1; } + // Open Query + if (vnodeQueryOpen(pVnode)) { + return -1; + } + // TODO return 0; } diff --git a/source/dnode/vnode/impl/src/vnodeQuery.c b/source/dnode/vnode/impl/src/vnodeQuery.c new file mode 100644 index 0000000000..31481bf7c4 --- /dev/null +++ b/source/dnode/vnode/impl/src/vnodeQuery.c @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "vnodeDef.h" +#include "vnodeQuery.h" + +int vnodeQueryOpen(SVnode *pVnode) { + return qWorkerInit(NULL, &pVnode->pQuery); +} + +int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { + vInfo("query message is processed"); + qWorkerProcessQueryMsg(pVnode, pVnode->pQuery, pMsg); + return 0; +} + +int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { + vInfo("fetch message is processed"); + qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg); + return 0; +} + + diff --git a/source/dnode/vnode/meta/inc/metaQuery.h b/source/dnode/vnode/meta/inc/metaQuery.h index 110df8dd45..ca3b68b415 100644 --- a/source/dnode/vnode/meta/inc/metaQuery.h +++ b/source/dnode/vnode/meta/inc/metaQuery.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _TD_META_QUERY_H_ -#define _TD_META_QUERY_H_ +#ifndef _VNODE_QUERY_H_ +#define _VNODE_QUERY_H_ #ifdef __cplusplus extern "C" { @@ -24,4 +24,4 @@ extern "C" { } #endif -#endif /*_TD_META_QUERY_H_*/ \ No newline at end of file +#endif /*_VNODE_QUERY_H_*/ \ No newline at end of file diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 6955da8a8c..37d3e655c2 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -558,6 +558,7 @@ int32_t qwBuildAndSendQueryRsp(SRpcMsg *pMsg, int32_t code) { SRpcMsg rpcRsp = { .handle = pMsg->handle, + .ahandle = pMsg->ahandle, .pCont = pRsp, .contLen = sizeof(*pRsp), .code = code, @@ -569,24 +570,105 @@ int32_t qwBuildAndSendQueryRsp(SRpcMsg *pMsg, int32_t code) { } int32_t qwBuildAndSendReadyRsp(SRpcMsg *pMsg, int32_t code) { + SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp)); + pRsp->code = code; + + SRpcMsg rpcRsp = { + .handle = pMsg->handle, + .ahandle = pMsg->ahandle, + .pCont = pRsp, + .contLen = sizeof(*pRsp), + .code = code, + }; + + rpcSendResponse(&rpcRsp); + return TSDB_CODE_SUCCESS; } int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) { + int32_t size = 0; + + if (sStatus) { + size = sizeof(SSchedulerStatusRsp) + sizeof(sStatus->status[0]) * sStatus->num; + } else { + size = sizeof(SSchedulerStatusRsp); + } + + SSchedulerStatusRsp *pRsp = (SSchedulerStatusRsp *)rpcMallocCont(size); + if (sStatus) { + memcpy(pRsp, sStatus, size); + } else { + pRsp->num = 0; + } + + SRpcMsg rpcRsp = { + .handle = pMsg->handle, + .ahandle = pMsg->ahandle, + .pCont = pRsp, + .contLen = size, + .code = 0, + }; + + rpcSendResponse(&rpcRsp); + + return TSDB_CODE_SUCCESS; } int32_t qwBuildAndSendFetchRsp(SRpcMsg *pMsg, void *data) { + SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); + memset(pRsp, 0, sizeof(SRetrieveTableRsp)); + + //TODO fill msg + pRsp->completed = true; + + SRpcMsg rpcRsp = { + .handle = pMsg->handle, + .ahandle = pMsg->ahandle, + .pCont = pRsp, + .contLen = sizeof(*pRsp), + .code = 0, + }; + rpcSendResponse(&rpcRsp); + + return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg) { +int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code) { + STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp)); + pRsp->code = code; + SRpcMsg rpcRsp = { + .handle = pMsg->handle, + .ahandle = pMsg->ahandle, + .pCont = pRsp, + .contLen = sizeof(*pRsp), + .code = code, + }; + + rpcSendResponse(&rpcRsp); + + return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendDropRsp(SRpcMsg *pMsg) { +int32_t qwBuildAndSendDropRsp(SRpcMsg *pMsg, int32_t code) { + STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp)); + pRsp->code = code; + + SRpcMsg rpcRsp = { + .handle = pMsg->handle, + .ahandle = pMsg->ahandle, + .pCont = pRsp, + .contLen = sizeof(*pRsp), + .code = code, + }; + + rpcSendResponse(&rpcRsp); + return TSDB_CODE_SUCCESS; } @@ -724,8 +806,10 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedu SQWorkerSchStatus *sch = NULL; SQWorkerTaskStatus *task = NULL; int32_t code = 0; + int32_t needRsp = true; + void *data = NULL; - QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); + QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); QW_LOCK(QW_READ, &task->lock); @@ -736,7 +820,7 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedu } if (QW_GOT_RES_DATA(res->data)) { - QW_ERR_JRET(qwBuildAndSendFetchRsp(pMsg, res->data)); + data = res->data; if (QW_LOW_RES_DATA(res->data)) { if (task->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { //TODO add query back to queue @@ -749,6 +833,8 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedu } //TODO SET FLAG FOR QUERY TO SEND RSP WHEN RES READY + + needRsp = false; } _return: @@ -758,9 +844,12 @@ _return: if (sch) { qwReleaseTask(QW_READ, sch); + qwReleaseScheduler(QW_READ, mgmt); } - qwReleaseScheduler(QW_READ, mgmt); + if (needRsp) { + qwBuildAndSendFetchRsp(pMsg, res->data); + } QW_RET(code); } @@ -844,13 +933,14 @@ int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) { return TSDB_CODE_SUCCESS; } -int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg **rsp) { - if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) { +int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } SSubQueryMsg *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + qError("invalid query msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -863,7 +953,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRp QW_ERR_JRET(qwCheckTaskCancelDrop(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, &needStop)); if (needStop) { qWarn("task need stop"); - QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED); + QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); } code = qStringToSubplan(msg->msg, &plan); @@ -922,13 +1012,14 @@ _return: QW_RET(code); } -int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp){ - if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) { +int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } SResReadyMsg *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + qError("invalid task status msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -937,27 +1028,31 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRp return TSDB_CODE_SUCCESS; } -int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp) { - if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) { +int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } + int32_t code = 0; SSchTasksStatusMsg *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + qError("invalid task status msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } SSchedulerStatusRsp *sStatus = NULL; - QW_ERR_RET(qwGetSchTasksStatus(qWorkerMgmt, msg->schedulerId, &sStatus)); + QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->schedulerId, &sStatus)); + +_return: QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus)); return TSDB_CODE_SUCCESS; } -int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp) { - if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) { +int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } @@ -983,36 +1078,44 @@ _return: QW_RET(code); } -int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp) { - if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) { +int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } + int32_t code = 0; STaskCancelMsg *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + qError("invalid task cancel msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - QW_ERR_RET(qwCancelTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId)); + QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId)); - QW_ERR_RET(qwBuildAndSendCancelRsp(pMsg)); +_return: + + QW_ERR_RET(qwBuildAndSendCancelRsp(pMsg, code)); return TSDB_CODE_SUCCESS; } -int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp) { - if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) { +int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } + int32_t code = 0; STaskDropMsg *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + qError("invalid task drop msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - QW_ERR_RET(qwCancelDropTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId)); + QW_ERR_JRET(qwCancelDropTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId)); + +_return: - QW_ERR_RET(qwBuildAndSendDropRsp(pMsg)); + QW_ERR_RET(qwBuildAndSendDropRsp(pMsg, code)); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index f3de499dcd..bc7bc44350 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -43,6 +43,17 @@ typedef struct SSchedulerMgmt { SHashObj *jobs; // key: queryId, value: SQueryJob* } SSchedulerMgmt; +typedef struct SQueryLevel { + int32_t level; + int8_t status; + SRWLatch lock; + int32_t taskFailed; + int32_t taskSucceed; + int32_t taskNum; + SArray *subTasks; // Element is SQueryTask +} SQueryLevel; + + typedef struct SQueryTask { uint64_t taskId; // task id SQueryLevel *level; // level @@ -57,16 +68,6 @@ typedef struct SQueryTask { SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* } SQueryTask; -typedef struct SQueryLevel { - int32_t level; - int8_t status; - SRWLatch lock; - int32_t taskFailed; - int32_t taskSucceed; - int32_t taskNum; - SArray *subTasks; // Element is SQueryTask -} SQueryLevel; - typedef struct SQueryJob { uint64_t queryId; int32_t levelNum; @@ -92,8 +93,8 @@ typedef struct SQueryJob { #define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE #define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE -#define SCH_IS_DATA_SRC_TASK(task) (task->plan->type == QUERY_TYPE_SCAN) -#define SCH_TASK_NEED_WAIT_ALL(type) (task->plan->type == QUERY_TYPE_MODIFY) +#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN) +#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY) #define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__) #define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index a2fbdbe924..4185b3176c 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -647,9 +647,18 @@ void schDropJobAllTasks(SQueryJob *job) { while (pIter) { SQueryTask *task = *(SQueryTask **)pIter; - schAsyncSendMsg(job, task, int32_t msgType); + schAsyncSendMsg(job, task, TSDB_MSG_TYPE_DROP_TASK); - pIter = taosHashIterate(schStatus->tasksHash, pIter); + pIter = taosHashIterate(job->succTasks, pIter); + } + + pIter = taosHashIterate(job->failTasks, NULL); + while (pIter) { + SQueryTask *task = *(SQueryTask **)pIter; + + schAsyncSendMsg(job, task, TSDB_MSG_TYPE_DROP_TASK); + + pIter = taosHashIterate(job->succTasks, pIter); } } -- GitLab