From 5cf4edacbb6da2a5b554396f5fc7ccee6b80cd47 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 12 Jan 2022 18:40:22 +0800 Subject: [PATCH] feature/qnode --- include/common/tmsg.h | 7 + include/common/tmsgdef.h | 2 + include/libs/qworker/qworker.h | 19 +- source/dnode/vnode/impl/inc/vnodeQuery.h | 1 + source/dnode/vnode/impl/src/vnodeQuery.c | 17 +- source/libs/executor/src/executorMain.c | 6 +- source/libs/qworker/inc/qworkerInt.h | 76 +++-- source/libs/qworker/src/qworker.c | 381 ++++++++++++++++------ source/libs/qworker/test/qworkerTests.cpp | 13 +- 9 files changed, 383 insertions(+), 139 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 2aaa2168cc..339db43374 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -993,6 +993,13 @@ typedef struct { uint64_t taskId; } SSinkDataReq; +typedef struct { + SMsgHead header; + uint64_t sId; + uint64_t queryId; + uint64_t taskId; +} SQueryContinueReq; + typedef struct { SMsgHead header; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 592672b32b..2f4bae2fa0 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -170,6 +170,8 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES, "vnode-show-tables", SVShowTablesReq, SVShowTablesRsp) TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES_FETCH, "vnode-show-tables-fetch", SVShowTablesFetchReq, SVShowTablesFetchRsp) + TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "vnode-query-continue", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SCHEDULE_DATA_SINK, "vnode-schedule-data-sink", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 9897467230..08b5fb98e7 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -22,9 +22,18 @@ extern "C" { #include "trpc.h" + +enum { + NODE_TYPE_VNODE = 1, + NODE_TYPE_QNODE, + NODE_TYPE_SNODE, +}; + + + typedef struct SQWorkerCfg { uint32_t maxSchedulerNum; - uint32_t maxResCacheNum; + uint32_t maxTaskNum; uint32_t maxSchTaskNum; } SQWorkerCfg; @@ -39,11 +48,17 @@ typedef struct { uint64_t numOfErrors; } SQWorkerStat; +typedef int32_t (*putReqToQueryQFp)(void *, struct SRpcMsg *); -int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt); + +int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, putReqToQueryQFp fp); int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); +int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); + +int32_t qWorkerProcessDataSinkMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); + int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/vnode/impl/inc/vnodeQuery.h b/source/dnode/vnode/impl/inc/vnodeQuery.h index d43f5b1cf1..b51aafb313 100644 --- a/source/dnode/vnode/impl/inc/vnodeQuery.h +++ b/source/dnode/vnode/impl/inc/vnodeQuery.h @@ -22,6 +22,7 @@ extern "C" { #include "vnodeInt.h" #include "qworker.h" + typedef struct SQWorkerMgmt SQHandle; diff --git a/source/dnode/vnode/impl/src/vnodeQuery.c b/source/dnode/vnode/impl/src/vnodeQuery.c index 909b233efb..52b20f8411 100644 --- a/source/dnode/vnode/impl/src/vnodeQuery.c +++ b/source/dnode/vnode/impl/src/vnodeQuery.c @@ -19,11 +19,22 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg); static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); -int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NULL, &pVnode->pQuery); } +int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, &pVnode->pQuery, pVnode, vnodePutReqToVQueryQ); } int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - vTrace("query message is processed"); - return qWorkerProcessQueryMsg(pVnode->pTsdb, pVnode->pQuery, pMsg); + vTrace("query message is processing"); + + switch (pMsg->msgType) { + case TDMT_VND_QUERY: + return qWorkerProcessQueryMsg(pVnode->pTsdb, pVnode->pQuery, pMsg); + case TDMT_VND_QUERY_CONTINUE: + return qWorkerProcessQueryContinueMsg(pVnode->pTsdb, pVnode->pQuery, pMsg); + case TDMT_VND_SCHEDULE_DATA_SINK: + return qWorkerProcessDataSinkMsg(pVnode->pTsdb, pVnode->pQuery, pMsg); + default: + vError("unknown msg type:%d in query queue", pMsg->msgType); + return TSDB_CODE_VND_APP_ERROR; + } } int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 50f69fb567..daeefba253 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -178,8 +178,10 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) { publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC); int64_t st = 0; - *handle = pTaskInfo->dsHandle; - + if (handle) { + *handle = pTaskInfo->dsHandle; + } + while(1) { st = taosGetTimestampUs(); SSDataBlock* pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup); diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 7883079fbe..b5ec50cc04 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -23,7 +23,7 @@ extern "C" { #include "tlockfree.h" #define QWORKER_DEFAULT_SCHEDULER_NUMBER 10000 -#define QWORKER_DEFAULT_RES_CACHE_NUMBER 10000 +#define QWORKER_DEFAULT_TASK_NUMBER 10000 #define QWORKER_DEFAULT_SCH_TASK_NUMBER 10000 enum { @@ -57,7 +57,6 @@ enum { QW_ADD_ACQUIRE, }; - typedef struct SQWTaskStatus { SRWLatch lock; int32_t code; @@ -67,12 +66,14 @@ typedef struct SQWTaskStatus { bool drop; } SQWTaskStatus; -typedef struct SQWorkerTaskHandlesCache { +typedef struct SQWTaskCtx { SRWLatch lock; + int8_t sinkScheduled; + int8_t queryScheduled; bool needRsp; qTaskInfo_t taskHandle; DataSinkHandle sinkHandle; -} SQWorkerTaskHandlesCache; +} SQWTaskCtx; typedef struct SQWSchStatus { int32_t lastAccessTs; // timestamp in second @@ -82,11 +83,15 @@ typedef struct SQWSchStatus { // Qnode/Vnode level task management typedef struct SQWorkerMgmt { - SQWorkerCfg cfg; - SRWLatch schLock; - SRWLatch resLock; - SHashObj *schHash; //key: schedulerId, value: SQWSchStatus - SHashObj *resHash; //key: queryId+taskId, value: SQWorkerResCache + SQWorkerCfg cfg; + int8_t nodeType; + int32_t nodeId; + SRWLatch schLock; + SRWLatch ctxLock; + SHashObj *schHash; //key: schedulerId, value: SQWSchStatus + SHashObj *ctxHash; //key: queryId+taskId, value: SQWTaskCtx + void *nodeObj; + putReqToQueryQFp putToQueueFp; } SQWorkerMgmt; #define QW_GOT_RES_DATA(data) (true) @@ -95,40 +100,63 @@ typedef struct SQWorkerMgmt { #define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code)) #define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code)) #define QW_TASK_READY_RESP(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || status == JOB_TASK_STATUS_PARTIAL_SUCCEED) -#define QW_SET_QTID(id, qid, tid) do { *(uint64_t *)(id) = (qid); *(uint64_t *)((char *)(id) + sizeof(qid)) = (tid); } while (0) -#define QW_GET_QTID(id, qid, tid) do { (qid) = *(uint64_t *)(id); (tid) = *(uint64_t *)((char *)(id) + sizeof(qid)); } while (0) - +#define QW_SET_QTID(id, qId, tId) do { *(uint64_t *)(id) = (qId); *(uint64_t *)((char *)(id) + sizeof(qId)) = (tId); } while (0) +#define QW_GET_QTID(id, qId, tId) do { (qId) = *(uint64_t *)(id); (tId) = *(uint64_t *)((char *)(id) + sizeof(qId)); } while (0) +#define QW_IDS() sId, qId, tId #define QW_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define QW_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) -#define QW_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0) #define QW_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) +#define QW_ELOG(param, ...) qError("QW:%p " param, mgmt, __VA_ARGS__) +#define QW_DLOG(param, ...) qDebug("QW:%p " param, mgmt, __VA_ARGS__) + +#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64 param, mgmt, sId, __VA_ARGS__) +#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64 param, mgmt, sId, __VA_ARGS__) + +#define QW_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64 param, mgmt, sId, qId, tId, __VA_ARGS__) +#define QW_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64 param, mgmt, sId, qId, tId, __VA_ARGS__) +#define QW_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64 param, mgmt, sId, qId, tId, __VA_ARGS__) + + +#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 + #define QW_LOCK(type, _lock) do { \ if (QW_READ == (type)) { \ - if ((*(_lock)) < 0) assert(0); \ - taosRLockLatch(_lock); \ - qDebug("QW RLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + assert(atomic_load_32((_lock)) >= 0); \ + qDebug("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + taosRLockLatch(_lock); \ + qDebug("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_32((_lock)) > 0); \ } else { \ - if ((*(_lock)) < 0) assert(0); \ + assert(atomic_load_32((_lock)) >= 0); \ + qDebug("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosWLockLatch(_lock); \ - qDebug("QW WLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + qDebug("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ } \ } while (0) - + #define QW_UNLOCK(type, _lock) do { \ if (QW_READ == (type)) { \ - if ((*(_lock)) <= 0) assert(0); \ + assert(atomic_load_32((_lock)) > 0); \ + qDebug("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosRUnLockLatch(_lock); \ - qDebug("QW RULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + qDebug("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_32((_lock)) >= 0); \ } else { \ - if ((*(_lock)) <= 0) assert(0); \ + assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \ + qDebug("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosWUnLockLatch(_lock); \ - qDebug("QW WULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ + qDebug("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + assert(atomic_load_32((_lock)) >= 0); \ } \ } while (0) -static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch, int32_t nOpt); + + +static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch); +static int32_t qwAddAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch); #ifdef __cplusplus diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 2a395fcfe1..23d74ae91d 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -8,7 +8,7 @@ #include "tname.h" #include "dataSinkMgt.h" -int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) { +int32_t qwValidateStatus(SQWorkerMgmt *mgmt, int8_t oriStatus, int8_t newStatus, uint64_t sId, uint64_t qId, uint64_t tId) { int32_t code = 0; if (oriStatus == newStatus) { @@ -62,7 +62,7 @@ int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) { break; default: - qError("invalid task status:%d", oriStatus); + QW_TASK_ELOG("invalid task status:%d", oriStatus); return TSDB_CODE_QRY_APP_ERROR; } @@ -70,22 +70,27 @@ int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) { _return: - qError("invalid task status, from %d to %d", oriStatus, newStatus); - QW_ERR_RET(code); + QW_TASK_ELOG("invalid task status update from %d to %d", oriStatus, newStatus); + QW_RET(code); } -int32_t qwUpdateTaskInfo(SQWTaskStatus *task, int8_t type, void *data) { +int32_t qwUpdateTaskInfo(SQWorkerMgmt *mgmt, SQWTaskStatus *task, int8_t type, void *data, uint64_t sId, uint64_t qId, uint64_t tId) { int32_t code = 0; + int8_t origStatus = 0; switch (type) { case QW_TASK_INFO_STATUS: { int8_t newStatus = *(int8_t *)data; - QW_ERR_RET(qwValidateStatus(task->status, newStatus)); + QW_ERR_RET(qwValidateStatus(mgmt, task->status, newStatus, QW_IDS())); + + origStatus = task->status; task->status = newStatus; + + QW_TASK_DLOG("task status updated from %d to %d", origStatus, newStatus); break; } default: - qError("uknown task info type:%d", type); + QW_TASK_ELOG("unknown task info, type:%d", type); return TSDB_CODE_QRY_APP_ERROR; } @@ -96,18 +101,18 @@ int32_t qwAddTaskHandlesToCache(SQWorkerMgmt *mgmt, uint64_t qId, uint64_t tId, char id[sizeof(qId) + sizeof(tId)] = {0}; QW_SET_QTID(id, qId, tId); - SQWorkerTaskHandlesCache resCache = {0}; + SQWTaskCtx resCache = {0}; resCache.taskHandle = taskHandle; resCache.sinkHandle = sinkHandle; - QW_LOCK(QW_WRITE, &mgmt->resLock); - if (0 != taosHashPut(mgmt->resHash, id, sizeof(id), &resCache, sizeof(SQWorkerTaskHandlesCache))) { - QW_UNLOCK(QW_WRITE, &mgmt->resLock); + QW_LOCK(QW_WRITE, &mgmt->ctxLock); + if (0 != taosHashPut(mgmt->ctxHash, id, sizeof(id), &resCache, sizeof(SQWTaskCtx))) { + QW_UNLOCK(QW_WRITE, &mgmt->ctxLock); qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to resHash failed", qId, tId); return TSDB_CODE_QRY_APP_ERROR; } - QW_UNLOCK(QW_WRITE, &mgmt->resLock); + QW_UNLOCK(QW_WRITE, &mgmt->ctxLock); return TSDB_CODE_SUCCESS; } @@ -116,7 +121,7 @@ static int32_t qwAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus newSch = {0}; newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); if (NULL == newSch.tasksHash) { - qError("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum); + QW_SCH_DLOG("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum); return TSDB_CODE_QRY_OUT_OF_MEMORY; } @@ -126,14 +131,18 @@ static int32_t qwAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, if (0 != code) { if (!HASH_NODE_EXIST(code)) { QW_UNLOCK(QW_WRITE, &mgmt->schLock); - qError("taosHashPut sId[%"PRIx64"] to scheduleHash failed", sId); + QW_SCH_ELOG("taosHashPut new sch to scheduleHash failed, errno:%d", errno); taosHashCleanup(newSch.tasksHash); return TSDB_CODE_QRY_APP_ERROR; } } QW_UNLOCK(QW_WRITE, &mgmt->schLock); - if (TSDB_CODE_SUCCESS == qwAcquireScheduler(rwType, mgmt, sId, sch, QW_NOT_EXIST_ADD)) { + if (TSDB_CODE_SUCCESS == qwAcquireScheduler(rwType, mgmt, sId, sch)) { + if (code) { + taosHashCleanup(newSch.tasksHash); + } + return TSDB_CODE_SUCCESS; } } @@ -141,7 +150,7 @@ static int32_t qwAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, return TSDB_CODE_SUCCESS; } -static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch, int32_t nOpt) { +static int32_t qwAcquireSchedulerImpl(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch, int32_t nOpt) { QW_LOCK(rwType, &mgmt->schLock); *sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId)); if (NULL == (*sch)) { @@ -159,6 +168,14 @@ static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t s return TSDB_CODE_SUCCESS; } +static int32_t qwAddAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) { + return qwAcquireSchedulerImpl(rwType, mgmt, sId, sch, QW_NOT_EXIST_ADD); +} + +static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) { + return qwAcquireSchedulerImpl(rwType, mgmt, sId, sch, QW_NOT_EXIST_RET_ERR); +} + static FORCE_INLINE void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) { QW_UNLOCK(rwType, &mgmt->schLock); } @@ -234,7 +251,7 @@ int32_t qwAddTaskToSch(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t static int32_t qwAddTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t status, int32_t eOpt, SQWSchStatus **sch, SQWTaskStatus **task) { SQWSchStatus *tsch = NULL; - QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &tsch, QW_NOT_EXIST_ADD)); + QW_ERR_RET(qwAddAcquireScheduler(QW_READ, mgmt, sId, &tsch)); int32_t code = qwAddTaskToSch(QW_READ, tsch, qId, tId, status, eOpt, task); if (code) { @@ -250,14 +267,14 @@ static int32_t qwAddTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_ QW_RET(code); } -static FORCE_INLINE int32_t qwAcquireTaskHandles(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, SQWorkerTaskHandlesCache **handles) { +static FORCE_INLINE int32_t qwAcquireTaskCtx(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, SQWTaskCtx **handles) { char id[sizeof(queryId) + sizeof(taskId)] = {0}; QW_SET_QTID(id, queryId, taskId); - QW_LOCK(rwType, &mgmt->resLock); - *handles = taosHashGet(mgmt->resHash, id, sizeof(id)); + QW_LOCK(rwType, &mgmt->ctxLock); + *handles = taosHashGet(mgmt->ctxHash, id, sizeof(id)); if (NULL == (*handles)) { - QW_UNLOCK(rwType, &mgmt->resLock); + QW_UNLOCK(rwType, &mgmt->ctxLock); return TSDB_CODE_QRY_RES_CACHE_NOT_EXIST; } @@ -265,7 +282,7 @@ static FORCE_INLINE int32_t qwAcquireTaskHandles(int32_t rwType, SQWorkerMgmt *m } static FORCE_INLINE void qwReleaseTaskResCache(int32_t rwType, SQWorkerMgmt *mgmt) { - QW_UNLOCK(rwType, &mgmt->resLock); + QW_UNLOCK(rwType, &mgmt->ctxLock); } @@ -273,7 +290,7 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRs SQWSchStatus *sch = NULL; int32_t taskNum = 0; - QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); sch->lastAccessTs = taosGetTimestampSec(); @@ -319,7 +336,7 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRs int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t sId) { SQWSchStatus *sch = NULL; - QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); sch->lastAccessTs = taosGetTimestampSec(); @@ -333,12 +350,12 @@ int32_t qwUpdateTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6 SQWTaskStatus *task = NULL; int32_t code = 0; - QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); QW_ERR_JRET(qwAcquireTask(QW_READ, sch, qId, tId, &task)); QW_LOCK(QW_WRITE, &task->lock); - qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status); + qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS()); QW_UNLOCK(QW_WRITE, &task->lock); _return: @@ -355,7 +372,7 @@ int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint SQWTaskStatus *task = NULL; int32_t code = 0; - if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)) { + if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch)) { *taskStatus = JOB_TASK_STATUS_NULL; return TSDB_CODE_SUCCESS; } @@ -376,17 +393,17 @@ int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint } -int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId) { +int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) { SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; - QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_ADD)); + QW_ERR_RET(qwAddAcquireScheduler(QW_READ, mgmt, sId, &sch)); - if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) { + if (qwAcquireTask(QW_READ, sch, qId, tId, &task)) { qwReleaseScheduler(QW_READ, mgmt); - code = qwAddTask(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_NOT_START, QW_EXIST_ACQUIRE, &sch, &task); + code = qwAddTask(mgmt, sId, qId, tId, JOB_TASK_STATUS_NOT_START, QW_EXIST_ACQUIRE, &sch, &task); if (code) { qwReleaseScheduler(QW_READ, mgmt); QW_ERR_RET(code); @@ -409,10 +426,10 @@ int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_ return TSDB_CODE_SUCCESS; } else if (task->status == JOB_TASK_STATUS_FAILED || task->status == JOB_TASK_STATUS_SUCCEED || task->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { newStatus = JOB_TASK_STATUS_CANCELLED; - QW_ERR_JRET(qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus)); + QW_ERR_JRET(qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &newStatus, QW_IDS())); } else { newStatus = JOB_TASK_STATUS_CANCELLING; - QW_ERR_JRET(qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus)); + QW_ERR_JRET(qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &newStatus, QW_IDS())); } QW_UNLOCK(QW_WRITE, &task->lock); @@ -441,50 +458,60 @@ _return: QW_RET(code); } -int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId) { +int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) { SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; - char id[sizeof(queryId) + sizeof(taskId)] = {0}; - QW_SET_QTID(id, queryId, taskId); + + char id[sizeof(qId) + sizeof(tId)] = {0}; + QW_SET_QTID(id, qId, tId); - QW_LOCK(QW_WRITE, &mgmt->resLock); - if (mgmt->resHash) { - taosHashRemove(mgmt->resHash, id, sizeof(id)); + QW_LOCK(QW_WRITE, &mgmt->ctxLock); + if (mgmt->ctxHash) { + if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) { + QW_TASK_WLOG("taosHashRemove from ctx hash failed, id:%s", id); + } } - QW_UNLOCK(QW_WRITE, &mgmt->resLock); + QW_UNLOCK(QW_WRITE, &mgmt->ctxLock); - if (TSDB_CODE_SUCCESS != qwAcquireScheduler(QW_WRITE, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)) { - qWarn("scheduler %"PRIx64" doesn't exist", sId); + if (qwAcquireScheduler(QW_WRITE, mgmt, sId, &sch)) { + QW_TASK_WLOG("scheduler does not exist, sch:%p", sch); return TSDB_CODE_SUCCESS; } - if (qwAcquireTask(QW_WRITE, sch, queryId, taskId, &task)) { + if (qwAcquireTask(QW_WRITE, sch, qId, tId, &task)) { qwReleaseScheduler(QW_WRITE, mgmt); - qWarn("scheduler %"PRIx64" queryId %"PRIx64" taskId:%"PRIx64" doesn't exist", sId, queryId, taskId); + QW_TASK_WLOG("task does not exist, task:%p", task); return TSDB_CODE_SUCCESS; } - taosHashRemove(sch->tasksHash, id, sizeof(id)); + QW_TASK_DLOG("drop task, status:%d, code:%x, ready:%d, cancel:%d, drop:%d", task->status, task->code, task->ready, task->cancel, task->drop); + + if (taosHashRemove(sch->tasksHash, id, sizeof(id))) { + QW_TASK_ELOG("taosHashRemove task from hash failed, task:%p", task); + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + +_return: qwReleaseTask(QW_WRITE, sch); qwReleaseScheduler(QW_WRITE, mgmt); - return TSDB_CODE_SUCCESS; + QW_RET(code); } -int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId) { +int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) { SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; - QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_ADD)); + QW_ERR_RET(qwAddAcquireScheduler(QW_READ, mgmt, sId, &sch)); - if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) { + if (qwAcquireTask(QW_READ, sch, qId, tId, &task)) { qwReleaseScheduler(QW_READ, mgmt); - code = qwAddTask(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_NOT_START, QW_EXIST_ACQUIRE, &sch, &task); + code = qwAddTask(mgmt, sId, qId, tId, JOB_TASK_STATUS_NOT_START, QW_EXIST_ACQUIRE, &sch, &task); if (code) { qwReleaseScheduler(QW_READ, mgmt); QW_ERR_RET(code); @@ -500,7 +527,7 @@ int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uin if (task->status == JOB_TASK_STATUS_EXECUTING) { newStatus = JOB_TASK_STATUS_DROPPING; - QW_ERR_JRET(qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus)); + QW_ERR_JRET(qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &newStatus, QW_IDS())); } else if (task->status == JOB_TASK_STATUS_CANCELLING || task->status == JOB_TASK_STATUS_DROPPING || task->status == JOB_TASK_STATUS_NOT_START) { QW_UNLOCK(QW_WRITE, &task->lock); qwReleaseTask(QW_READ, sch); @@ -512,7 +539,7 @@ int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uin qwReleaseTask(QW_READ, sch); qwReleaseScheduler(QW_READ, mgmt); - QW_ERR_RET(qwDropTask(mgmt, sId, queryId, taskId)); + QW_ERR_RET(qwDropTask(mgmt, sId, qId, tId)); return TSDB_CODE_SUCCESS; } @@ -743,7 +770,7 @@ int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryI SQWTaskStatus *task = NULL; int32_t code = 0; - QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); @@ -785,7 +812,7 @@ int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, SQWTaskStatus *task = NULL; int32_t code = 0; - QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); @@ -816,7 +843,7 @@ _return: QW_RET(code); } -int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, bool *needStop) { +int32_t qwCheckTaskCancelDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, bool *needStop) { SQWSchStatus *sch = NULL; SQWTaskStatus *task = NULL; int32_t code = 0; @@ -824,11 +851,11 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryI *needStop = false; - if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)) { + if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch)) { return TSDB_CODE_SUCCESS; } - if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) { + if (qwAcquireTask(QW_READ, sch, qId, tId, &task)) { qwReleaseScheduler(QW_READ, mgmt); return TSDB_CODE_SUCCESS; } @@ -836,9 +863,10 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryI QW_LOCK(QW_READ, &task->lock); if ((!task->cancel) && (!task->drop)) { - qError("no cancel or drop, but task:%"PRIx64" exists", taskId); + QW_TASK_ELOG("no cancel or drop but task exists, status:%d", task->status); QW_UNLOCK(QW_READ, &task->lock); + qwReleaseTask(QW_READ, sch); qwReleaseScheduler(QW_READ, mgmt); @@ -851,17 +879,21 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryI if (task->cancel) { QW_LOCK(QW_WRITE, &task->lock); - qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status); + code = qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS()); QW_UNLOCK(QW_WRITE, &task->lock); + + QW_ERR_JRET(code); } if (task->drop) { qwReleaseTask(QW_READ, sch); qwReleaseScheduler(QW_READ, mgmt); - return qwDropTask(mgmt, sId, queryId, taskId); + QW_RET(qwDropTask(mgmt, sId, qId, tId)); } +_return: + qwReleaseTask(QW_READ, sch); qwReleaseScheduler(QW_READ, mgmt); @@ -875,7 +907,7 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6 int32_t code = 0; int8_t newStatus = JOB_TASK_STATUS_CANCELLED; - code = qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_ADD); + code = qwAddAcquireScheduler(QW_READ, mgmt, sId, &sch); if (code) { qError("sId:%"PRIx64" not in cache", sId); QW_ERR_RET(code); @@ -895,7 +927,7 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6 if (task->cancel) { QW_LOCK(QW_WRITE, &task->lock); - qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus); + qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &newStatus, QW_IDS()); QW_UNLOCK(QW_WRITE, &task->lock); } @@ -910,7 +942,7 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6 if (!(task->cancel || task->drop)) { QW_LOCK(QW_WRITE, &task->lock); - qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status); + qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS()); task->code = errCode; QW_UNLOCK(QW_WRITE, &task->lock); } @@ -921,6 +953,86 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6 return TSDB_CODE_SUCCESS; } +int32_t qwScheduleDataSink(SQWTaskCtx *handles, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { + if (atomic_load_8(&handles->sinkScheduled)) { + qDebug("data sink already scheduled"); + return TSDB_CODE_SUCCESS; + } + + SSinkDataReq * req = (SSinkDataReq *)rpcMallocCont(sizeof(SSinkDataReq)); + if (NULL == req) { + qError("rpcMallocCont %d failed", (int32_t)sizeof(SSinkDataReq)); + QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + req->header.vgId = mgmt->nodeId; + req->sId = sId; + req->queryId = queryId; + req->taskId = taskId; + + SRpcMsg pNewMsg = { + .handle = pMsg->handle, + .ahandle = pMsg->ahandle, + .msgType = TDMT_VND_SCHEDULE_DATA_SINK, + .pCont = req, + .contLen = sizeof(SSinkDataReq), + .code = 0, + }; + + int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg); + if (TSDB_CODE_SUCCESS != code) { + qError("put data sink schedule msg to queue failed, code:%x", code); + rpcFreeCont(req); + QW_ERR_RET(code); + } + + qDebug("put data sink schedule msg to query queue"); + + return TSDB_CODE_SUCCESS; +} + +int32_t qwScheduleQuery(SQWTaskCtx *handles, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { + if (atomic_load_8(&handles->queryScheduled)) { + qDebug("query already scheduled"); + return TSDB_CODE_SUCCESS; + } + + QW_ERR_RET(qwUpdateTaskStatus(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_EXECUTING)); + + SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq)); + if (NULL == req) { + qError("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq)); + QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + req->header.vgId = mgmt->nodeId; + req->sId = sId; + req->queryId = queryId; + req->taskId = taskId; + + SRpcMsg pNewMsg = { + .handle = pMsg->handle, + .ahandle = pMsg->ahandle, + .msgType = TDMT_VND_QUERY_CONTINUE, + .pCont = req, + .contLen = sizeof(SQueryContinueReq), + .code = 0, + }; + + int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg); + if (TSDB_CODE_SUCCESS != code) { + qError("put query continue msg to queue failed, code:%x", code); + rpcFreeCont(req); + QW_ERR_RET(code); + } + + + qDebug("put query continue msg to query queue"); + + return TSDB_CODE_SUCCESS; +} + + int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { SQWSchStatus *sch = NULL; @@ -932,11 +1044,15 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64 int32_t dataLength = 0; SRetrieveTableRsp *rsp = NULL; bool queryEnd = false; - SQWorkerTaskHandlesCache *handles = NULL; + SQWTaskCtx *handles = NULL; - QW_ERR_JRET(qwAcquireTaskHandles(QW_READ, mgmt, queryId, taskId, &handles)); + QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, mgmt, queryId, taskId, &handles)); + if (atomic_load_8(&handles->needRsp)) { + qError("last fetch not responsed"); + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } - QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); + QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); QW_LOCK(QW_READ, &task->lock); @@ -974,15 +1090,15 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64 if (DS_BUF_EMPTY == output.bufStatus && output.queryEnd) { rsp->completed = 1; + + QW_ERR_JRET(qwUpdateTaskStatus(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_SUCCEED)); } + // Note: schedule data sink firstly and will schedule query after it's done if (output.needSchedule) { - //TODO - } - - if ((!output.queryEnd) && DS_BUF_LOW == output.bufStatus) { - //TODO - //UPDATE STATUS TO EXECUTING + QW_ERR_JRET(qwScheduleDataSink(handles, mgmt, sId, queryId, taskId, pMsg)); + } else if ((!output.queryEnd) && (DS_BUF_LOW == output.bufStatus || DS_BUF_EMPTY == output.bufStatus)) { + QW_ERR_JRET(qwScheduleQuery(handles, mgmt, sId, queryId, taskId, pMsg)); } } else { if (dataLength < 0) { @@ -991,12 +1107,11 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64 } if (queryEnd) { - QW_ERR_JRET(qwQueryPostProcess(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_SUCCEED, code)); + QW_ERR_JRET(qwUpdateTaskStatus(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_SUCCEED)); } else { - if (task->status != JOB_TASK_STATUS_EXECUTING) { - qError("invalid status %d for fetch without res", task->status); - QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); - } + assert(0 == handles->needRsp); + + qDebug("no res data in sink, need response later"); QW_LOCK(QW_WRITE, &handles->lock); handles->needRsp = true; @@ -1028,7 +1143,12 @@ _return: QW_RET(code); } -int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) { +int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, putReqToQueryQFp fp) { + if (NULL == qWorkerMgmt || NULL == nodeObj || NULL == fp) { + qError("invalid param to init qworker"); + QW_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + SQWorkerMgmt *mgmt = calloc(1, sizeof(SQWorkerMgmt)); if (NULL == mgmt) { qError("calloc %d failed", (int32_t)sizeof(SQWorkerMgmt)); @@ -1037,29 +1157,46 @@ int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) { if (cfg) { mgmt->cfg = *cfg; + if (0 == mgmt->cfg.maxSchedulerNum) { + mgmt->cfg.maxSchedulerNum = QWORKER_DEFAULT_SCHEDULER_NUMBER; + } + if (0 == mgmt->cfg.maxTaskNum) { + mgmt->cfg.maxTaskNum = QWORKER_DEFAULT_TASK_NUMBER; + } + if (0 == mgmt->cfg.maxSchTaskNum) { + mgmt->cfg.maxSchTaskNum = QWORKER_DEFAULT_SCH_TASK_NUMBER; + } } else { mgmt->cfg.maxSchedulerNum = QWORKER_DEFAULT_SCHEDULER_NUMBER; - mgmt->cfg.maxResCacheNum = QWORKER_DEFAULT_RES_CACHE_NUMBER; + mgmt->cfg.maxTaskNum = QWORKER_DEFAULT_TASK_NUMBER; mgmt->cfg.maxSchTaskNum = QWORKER_DEFAULT_SCH_TASK_NUMBER; } mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); if (NULL == mgmt->schHash) { tfree(mgmt); - QW_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler hash failed", mgmt->cfg.maxSchedulerNum); + qError("init %d scheduler hash failed", mgmt->cfg.maxSchedulerNum); + QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - mgmt->resHash = taosHashInit(mgmt->cfg.maxResCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - if (NULL == mgmt->resHash) { + mgmt->ctxHash = taosHashInit(mgmt->cfg.maxTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + if (NULL == mgmt->ctxHash) { taosHashCleanup(mgmt->schHash); mgmt->schHash = NULL; tfree(mgmt); - - QW_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d res cache hash failed", mgmt->cfg.maxResCacheNum); + qError("init %d task ctx hash failed", mgmt->cfg.maxTaskNum); + QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + mgmt->nodeType = nodeType; + mgmt->nodeId = nodeId; + mgmt->nodeObj = nodeObj; + mgmt->putToQueueFp = fp; + *qWorkerMgmt = mgmt; + qDebug("qworker initialized for node, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt); + return TSDB_CODE_SUCCESS; } @@ -1069,25 +1206,31 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { } int32_t code = 0; + bool queryRsped = false; + bool needStop = false; + struct SSubplan *plan = NULL; SSubQueryMsg *msg = pMsg->pCont; + SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; + if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { - qError("invalid query msg"); + QW_ELOG("invalid query msg, contLen:%d", pMsg->contLen); QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } - msg->sId = htobe64(msg->sId); - msg->queryId = htobe64(msg->queryId); - msg->taskId = htobe64(msg->taskId); + msg->sId = be64toh(msg->sId); + msg->queryId = be64toh(msg->queryId); + msg->taskId = be64toh(msg->taskId); msg->contentLen = ntohl(msg->contentLen); - bool queryRsped = false; - bool needStop = false; - struct SSubplan *plan = NULL; + uint64_t sId = msg->sId; + uint64_t qId = msg->queryId; + uint64_t tId = msg->taskId; QW_ERR_JRET(qwCheckTaskCancelDrop(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, &needStop)); if (needStop) { qWarn("task need stop"); - QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); + qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_QRY_TASK_CANCELLED); + QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED); } code = qStringToSubplan(msg->msg, &plan); @@ -1144,30 +1287,59 @@ int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *p int32_t code = 0; int8_t status = 0; bool queryDone = false; - uint64_t sId, qId, tId; + SQueryContinueReq *req = (SQueryContinueReq *)pMsg->pCont; + bool needStop = false; + SQWTaskCtx *handles = NULL; - //TODO call executer to continue execute subquery - code = 0; - void *data = NULL; - queryDone = false; - //TODO call executer to continue execute subquery + QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, qWorkerMgmt, req->queryId, req->taskId, &handles)); + + qTaskInfo_t taskHandle = handles->taskHandle; + DataSinkHandle sinkHandle = handles->sinkHandle; + bool needRsp = handles->needRsp; + + qwReleaseTaskResCache(QW_READ, qWorkerMgmt); + + QW_ERR_JRET(qwCheckTaskCancelDrop(qWorkerMgmt, req->sId, req->queryId, req->taskId, &needStop)); + if (needStop) { + qWarn("task need stop"); + if (needRsp) { + qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_QRY_TASK_CANCELLED); + } + QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED); + } + + DataSinkHandle newHandle = NULL; + code = qExecTask(taskHandle, &newHandle); + if (code) { + qError("qExecTask failed, code:%x", code); + QW_ERR_JRET(code); + } + + if (sinkHandle != newHandle) { + qError("data sink mis-match"); + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + +_return: + + if (needRsp) { + code = qwBuildAndSendQueryRsp(pMsg, code); + } if (TSDB_CODE_SUCCESS != code) { status = JOB_TASK_STATUS_FAILED; - } else if (queryDone) { - status = JOB_TASK_STATUS_SUCCEED; } else { status = JOB_TASK_STATUS_PARTIAL_SUCCEED; } - code = qwQueryPostProcess(qWorkerMgmt, sId, qId, tId, status, code); - + code = qwQueryPostProcess(qWorkerMgmt, req->sId, req->queryId, req->taskId, status, code); + QW_RET(code); } -int32_t qWorkerProcessSinkDataMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ +int32_t qWorkerProcessDataSinkMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } @@ -1176,8 +1348,9 @@ int32_t qWorkerProcessSinkDataMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ if (NULL == msg || pMsg->contLen < sizeof(*msg)) { qError("invalid sink data msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); - } + } + //dsScheduleProcess(); //TODO return TSDB_CODE_SUCCESS; diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index eaa79fd39a..4962eab460 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -42,6 +42,11 @@ int32_t qwtStringToPlan(const char* str, SSubplan** subplan) { return 0; } +int32_t qwtPutReqToQueue(void *node, struct SRpcMsg *pMsg) { + return 0; +} + + void qwtRpcSendResponse(const SRpcMsg *pRsp) { if (TDMT_VND_TASKS_STATUS_RSP == pRsp->msgType) { SSchedulerStatusRsp *rsp = (SSchedulerStatusRsp *)pRsp->pCont; @@ -258,7 +263,7 @@ TEST(seqTest, normalCase) { stubSetStringToPlan(); stubSetRpcSendResponse(); - code = qWorkerInit(NULL, &mgmt); + code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); ASSERT_EQ(code, 0); statusMsg.sId = htobe64(1); @@ -328,7 +333,7 @@ TEST(seqTest, cancelFirst) { stubSetStringToPlan(); stubSetRpcSendResponse(); - code = qWorkerInit(NULL, &mgmt); + code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); ASSERT_EQ(code, 0); statusMsg.sId = htobe64(1); @@ -402,7 +407,7 @@ TEST(seqTest, randCase) { srand(time(NULL)); - code = qWorkerInit(NULL, &mgmt); + code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); ASSERT_EQ(code, 0); int32_t t = 0; @@ -446,7 +451,7 @@ TEST(seqTest, multithreadRand) { srand(time(NULL)); - code = qWorkerInit(NULL, &mgmt); + code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); ASSERT_EQ(code, 0); pthread_attr_t thattr; -- GitLab