From 8e078d63223ecce9d4864058f2627d5746f04469 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 23 Dec 2021 17:42:40 +0800 Subject: [PATCH] feature/qnode --- include/common/taosmsg.h | 30 +- include/libs/qcom/query.h | 5 +- include/libs/qworker/qworker.h | 12 +- include/util/taoserror.h | 5 + include/util/thash.h | 2 + source/dnode/mgmt/impl/src/dndTransport.c | 4 + source/libs/catalog/test/catalogTests.cpp | 23 + source/libs/qworker/inc/qworkerInt.h | 45 +- source/libs/qworker/src/qworker.c | 971 +++++++++++++++++++--- source/libs/scheduler/src/scheduler.c | 13 +- source/util/src/terror.c | 5 +- source/util/src/thash.c | 2 +- 12 files changed, 962 insertions(+), 155 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index a8281d95a5..770a118bd5 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -52,6 +52,10 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_SET_CUR, "mq-set-cur" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_RES_READY, "res-ready" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_TASKS_STATUS, "tasks-status" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CANCEL_TASK, "cancel-task" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_TASK, "drop-task" ) + // message from client to mnode TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" ) @@ -1093,29 +1097,29 @@ typedef struct { /* data */ } SUpdateTagValRsp; -typedef struct SSchedulerQueryMsg { +typedef struct SSubQueryMsg { uint64_t schedulerId; uint64_t queryId; uint64_t taskId; uint32_t contentLen; char msg[]; -} SSchedulerQueryMsg; +} SSubQueryMsg; -typedef struct SSchedulerReadyMsg { +typedef struct SResReadyMsg { uint64_t schedulerId; uint64_t queryId; uint64_t taskId; -} SSchedulerReadyMsg; +} SResReadyMsg; -typedef struct SSchedulerFetchMsg { +typedef struct SResFetchMsg { uint64_t schedulerId; uint64_t queryId; uint64_t taskId; -} SSchedulerFetchMsg; +} SResFetchMsg; -typedef struct SSchedulerStatusMsg { +typedef struct SSchTasksStatusMsg { uint64_t schedulerId; -} SSchedulerStatusMsg; +} SSchTasksStatusMsg; typedef struct STaskStatus { uint64_t queryId; @@ -1129,11 +1133,17 @@ typedef struct SSchedulerStatusRsp { } SSchedulerStatusRsp; -typedef struct SSchedulerCancelMsg { +typedef struct STaskCancelMsg { + uint64_t schedulerId; + uint64_t queryId; + uint64_t taskId; +} STaskCancelMsg; + +typedef struct STaskDropMsg { uint64_t schedulerId; uint64_t queryId; uint64_t taskId; -} SSchedulerCancelMsg; +} STaskDropMsg; #pragma pack(pop) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 4fcbc1c528..31b96adec0 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -25,12 +25,15 @@ extern "C" { #include "tlog.h" enum { + JOB_TASK_STATUS_NULL = 0, JOB_TASK_STATUS_NOT_START = 1, JOB_TASK_STATUS_EXECUTING, + JOB_TASK_STATUS_PARTIAL_SUCCEED, JOB_TASK_STATUS_SUCCEED, JOB_TASK_STATUS_FAILED, JOB_TASK_STATUS_CANCELLING, - JOB_TASK_STATUS_CANCELLED + JOB_TASK_STATUS_CANCELLED, + JOB_TASK_STATUS_DROPPING, }; typedef struct STableComInfo { diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 63a6b6f89b..8e36178497 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -42,15 +42,17 @@ typedef struct { int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt); -int32_t qWorkerProcessQueryMsg(void *qWorkerMgmt, SSchedulerQueryMsg *msg, SRpcMsg *rsp); +int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg **rsp); -int32_t qWorkerProcessReadyMsg(void *qWorkerMgmt, SSchedulerReadyMsg *msg, SRpcMsg *rsp); +int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp); -int32_t qWorkerProcessStatusMsg(void *qWorkerMgmt, SSchedulerStatusMsg *msg, SRpcMsg *rsp); +int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp); -int32_t qWorkerProcessFetchMsg(void *qWorkerMgmt, SSchedulerFetchMsg *msg, SRpcMsg *rsp); +int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp); -int32_t qWorkerProcessCancelMsg(void *qWorkerMgmt, SSchedulerCancelMsg *msg, SRpcMsg *rsp); +int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp); + +int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp); void qWorkerDestroy(void **qWorkerMgmt); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 7e8df3add2..4362950844 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -314,6 +314,11 @@ int32_t* taosGetErrno(); #define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070D) //"invalid time condition") #define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070E) //"System error") #define TSDB_CODE_QRY_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x070F) //"invalid input") +#define TSDB_CODE_QRY_SCH_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0710) //"Scheduler not exist") +#define TSDB_CODE_QRY_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0711) //"Task not exist") +#define TSDB_CODE_QRY_TASK_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0712) //"Task already exist") +#define TSDB_CODE_QRY_RES_CACHE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0713) //"Task result cache not exist") +#define TSDB_CODE_QRY_TASK_CANCELLED TAOS_DEF_ERROR_CODE(0, 0x0714) //"Task cancelled") // grant diff --git a/include/util/thash.h b/include/util/thash.h index ebdc91f054..d0247a0729 100644 --- a/include/util/thash.h +++ b/include/util/thash.h @@ -33,6 +33,8 @@ typedef void (*_hash_free_fn_t)(void *); #define HASH_INDEX(v, c) ((v) & ((c)-1)) +#define HASH_NODE_EXIST(code) (code == -2) + /** * murmur hash algorithm * @key usually string diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 50b1a1cf20..b0bb0bfc05 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -45,6 +45,10 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TSDB_MSG_TYPE_MQ_SET_CUR] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TSDB_MSG_TYPE_RES_READY] = dndProcessVnodeFetchMsg; + pMgmt->msgFp[TSDB_MSG_TYPE_TASKS_STATUS] = dndProcessVnodeFetchMsg; + pMgmt->msgFp[TSDB_MSG_TYPE_CANCEL_TASK] = dndProcessVnodeFetchMsg; + pMgmt->msgFp[TSDB_MSG_TYPE_DROP_TASK] = dndProcessVnodeFetchMsg; // msg from client to mnode pMgmt->msgFp[TSDB_MSG_TYPE_CONNECT] = dndProcessMnodeReadMsg; diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 29c80ae9ec..4f41d728eb 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -29,6 +29,8 @@ #include "catalog.h" #include "tep.h" #include "trpc.h" +#include "stub.h" +#include "addr_any.h" typedef struct SAppInstInfo { int64_t numOfConns; @@ -86,6 +88,27 @@ void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) { ASSERT_EQ(rpcRsp.code, 0); } +void __rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + SUseDbRsp *rspMsg = NULL; //todo + + return; +} + + +void initTestEnv() { + static Stub stub; + stub.set(rpcSendRecv, __rpcSendRecv); + { + AddrAny any("libtransport.so"); + std::map result; + any.get_global_func_addr_dynsym("^rpcSendRecv$", result); + for (const auto& f : result) { + stub.set(f.second, __rpcSendRecv); + } + } +} + + } TEST(testCase, normalCase) { diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 1adc09def4..6f454e2f81 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -20,6 +20,8 @@ extern "C" { #endif +#include "tlockfree.h" + #define QWORKER_DEFAULT_SCHEDULER_NUMBER 10000 #define QWORKER_DEFAULT_RES_CACHE_NUMBER 10000 #define QWORKER_DEFAULT_SCH_TASK_NUMBER 10000 @@ -30,36 +32,63 @@ enum { QW_READY_RESPONSED, }; -typedef struct SQWorkerTaskStatus { - int8_t status; - int8_t ready; +enum { + QW_TASK_INFO_STATUS = 1, + QW_TASK_INFO_READY, +}; + +enum { + QW_READ = 1, + QW_WRITE, +}; + +typedef struct SQWorkerTaskStatus { + SRWLatch lock; + int32_t code; + int8_t status; + int8_t ready; + bool cancel; + bool drop; } SQWorkerTaskStatus; typedef struct SQWorkerResCache { + SRWLatch lock; void *data; } SQWorkerResCache; -typedef struct SQWorkerSchTaskStatus { +typedef struct SQWorkerSchStatus { int32_t lastAccessTs; // timestamp in second - SHashObj *taskStatus; // key:queryId+taskId, value: SQWorkerTaskStatus -} SQWorkerSchTaskStatus; + SRWLatch tasksLock; + SHashObj *tasksHash; // key:queryId+taskId, value: SQWorkerTaskStatus +} SQWorkerSchStatus; // Qnode/Vnode level task management typedef struct SQWorkerMgmt { SQWorkerCfg cfg; - SHashObj *scheduleHash; //key: schedulerId, value: SQWorkerSchTaskStatus + SRWLatch schLock; + SRWLatch resLock; + SHashObj *schHash; //key: schedulerId, value: SQWorkerSchStatus SHashObj *resHash; //key: queryId+taskId, value: SQWorkerResCache } SQWorkerMgmt; -#define QW_TASK_DONE(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == status == JOB_TASK_STATUS_CANCELLED) +#define QW_GOT_RES_DATA(data) (false) +#define QW_LOW_RES_DATA(data) (false) + +#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code)) +#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code)) +#define QW_TASK_READY_RESP(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || status == JOB_TASK_STATUS_PARTIAL_SUCCEED) #define QW_SET_QTID(id, qid, tid) do { *(uint64_t *)(id) = (qid); *(uint64_t *)((char *)(id) + sizeof(qid)) = (tid); } while (0) #define QW_GET_QTID(id, qid, tid) do { (qid) = *(uint64_t *)(id); (tid) = *(uint64_t *)((char *)(id) + sizeof(qid)); } while (0) + #define QW_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define QW_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define QW_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0) #define QW_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) +#define QW_LOCK(type, _lock) (QW_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock)) +#define QW_UNLOCK(type, _lock) (QW_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock)) + #ifdef __cplusplus } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 82bfd75b6a..628077a020 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -4,69 +4,79 @@ #include "qworkerInt.h" #include "planner.h" -int32_t qwAddTaskStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, int8_t taskStatus) { - SQWorkerTaskStatus tStatus = {0}; - tStatus.status = taskStatus; - - char id[sizeof(queryId) + sizeof(taskId)] = {0}; - QW_SET_QTID(id, queryId, taskId); +int32_t qwCheckStatusSwitch(int8_t oriStatus, int8_t newStatus) { + int32_t code = 0; - SQWorkerSchTaskStatus *schStatus = taosHashGet(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId)); - if (NULL == schStatus) { - SQWorkerSchTaskStatus newSchStatus = {0}; - newSchStatus.taskStatus = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - if (NULL == newSchStatus.taskStatus) { - qError("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum); - return TSDB_CODE_QRY_OUT_OF_MEMORY; + if (oriStatus == newStatus) { + if (newStatus == JOB_TASK_STATUS_CANCELLING) { + return TSDB_CODE_SUCCESS; } - - if (0 != taosHashPut(newSchStatus.taskStatus, id, sizeof(id), &tStatus, sizeof(tStatus))) { - qError("taosHashPut schedulerId[%"PRIx64"]queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", schedulerId, queryId, taskId); - taosHashCleanup(newSchStatus.taskStatus); - return TSDB_CODE_QRY_APP_ERROR; - } - - newSchStatus.lastAccessTs = taosGetTimestampSec(); - - if (0 != taosHashPut(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId), &newSchStatus, sizeof(newSchStatus))) { - qError("taosHashPut schedulerId[%"PRIx64"] to scheduleHash failed", schedulerId); - taosHashCleanup(newSchStatus.taskStatus); + + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + switch (oriStatus) { + case JOB_TASK_STATUS_NULL: + if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_FAILED ) { + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + break; + case JOB_TASK_STATUS_NOT_START: + if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_FAILED) { + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + break; + case JOB_TASK_STATUS_EXECUTING: + if (newStatus != JOB_TASK_STATUS_SUCCEED && newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_CANCELLING) { + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + break; + case JOB_TASK_STATUS_PARTIAL_SUCCEED: + if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_CANCELLING) { + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + break; + case JOB_TASK_STATUS_SUCCEED: + case JOB_TASK_STATUS_FAILED: + case JOB_TASK_STATUS_CANCELLING: + if (newStatus != JOB_TASK_STATUS_CANCELLED) { + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + break; + case JOB_TASK_STATUS_CANCELLED: + default: + qError("invalid task status:%d", oriStatus); return TSDB_CODE_QRY_APP_ERROR; - } - - return TSDB_CODE_SUCCESS; } - schStatus->lastAccessTs = taosGetTimestampSec(); + return TSDB_CODE_SUCCESS; - if (0 != taosHashPut(schStatus->taskStatus, id, sizeof(id), &tStatus, sizeof(tStatus))) { - qError("taosHashPut schedulerId[%"PRIx64"]queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", schedulerId, queryId, taskId); - return TSDB_CODE_QRY_APP_ERROR; - } +_return: - return TSDB_CODE_SUCCESS; + qError("invalid task status:%d", oriStatus); + QW_ERR_RET(code); } -int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, SQWorkerTaskStatus **taskStatus) { - SQWorkerSchTaskStatus *schStatus = taosHashGet(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId)); - if (NULL == schStatus) { - qError("no scheduler for schedulerId[%"PRIx64"]", schedulerId); - return TSDB_CODE_QRY_APP_ERROR; - } - - schStatus->lastAccessTs = taosGetTimestampSec(); +int32_t qwUpdateTaskInfo(SQWorkerTaskStatus *task, int8_t type, void *data) { + int32_t code = 0; - char id[sizeof(queryId) + sizeof(taskId)] = {0}; - QW_SET_QTID(id, queryId, taskId); - - SQWorkerTaskStatus *tStatus = taosHashGet(schStatus->taskStatus, id, sizeof(id)); - if (NULL == tStatus) { - qError("no task status for schedulerId[%"PRIx64"] queryId[%"PRIx64"] taskId[%"PRIx64"]", schedulerId, queryId, taskId); - return TSDB_CODE_QRY_APP_ERROR; + switch (type) { + case QW_TASK_INFO_STATUS: { + int8_t newStatus = *(int8_t *)data; + QW_ERR_RET(qwCheckStatusSwitch(task->status, newStatus)); + task->status = newStatus; + break; + } + default: + qError("uknown task info type:%d", type); + return TSDB_CODE_QRY_APP_ERROR; } - - *taskStatus = tStatus; - + return TSDB_CODE_SUCCESS; } @@ -76,12 +86,16 @@ int32_t qwAddTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, v SQWorkerResCache resCache = {0}; resCache.data = data; - + + QW_LOCK(QW_WRITE, &mgmt->resLock); if (0 != taosHashPut(mgmt->resHash, id, sizeof(id), &resCache, sizeof(SQWorkerResCache))) { + QW_UNLOCK(QW_WRITE, &mgmt->resLock); qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to resHash failed", queryId, taskId); return TSDB_CODE_QRY_APP_ERROR; } + QW_UNLOCK(QW_WRITE, &mgmt->resLock); + return TSDB_CODE_SUCCESS; } @@ -101,62 +115,693 @@ int32_t qwGetTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, v return TSDB_CODE_SUCCESS; } -int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t schedulerId) { - SQWorkerSchTaskStatus *schStatus = taosHashGet(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId)); - if (NULL == schStatus) { - qError("no scheduler for schedulerId[%"PRIx64"]", schedulerId); - return TSDB_CODE_QRY_APP_ERROR; + +static FORCE_INLINE int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t schedulerId, SQWorkerSchStatus **sch) { + QW_LOCK(rwType, &mgmt->schLock); + *sch = taosHashGet(mgmt->schHash, &schedulerId, sizeof(schedulerId)); + if (NULL == (*sch)) { + QW_LOCK(rwType, &mgmt->schLock); + return TSDB_CODE_QRY_SCH_NOT_EXIST; } - schStatus->lastAccessTs = taosGetTimestampSec(); + return TSDB_CODE_SUCCESS; +} + + +static FORCE_INLINE int32_t qwInsertAndAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t schedulerId, SQWorkerSchStatus **sch) { + SQWorkerSchStatus newSch = {0}; + newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + if (NULL == newSch.tasksHash) { + qError("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum); + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + while (true) { + QW_LOCK(QW_WRITE, &mgmt->schLock); + int32_t code = taosHashPut(mgmt->schHash, &schedulerId, sizeof(schedulerId), &newSch, sizeof(newSch)); + if (0 != code) { + if (!HASH_NODE_EXIST(code)) { + QW_UNLOCK(QW_WRITE, &mgmt->schLock); + qError("taosHashPut schedulerId[%"PRIx64"] to scheduleHash failed", schedulerId); + taosHashCleanup(newSch.tasksHash); + return TSDB_CODE_QRY_APP_ERROR; + } + } + + QW_UNLOCK(QW_WRITE, &mgmt->schLock); + if (TSDB_CODE_SUCCESS == qwAcquireScheduler(rwType, mgmt, schedulerId, sch)) { + taosHashCleanup(newSch.tasksHash); + return TSDB_CODE_SUCCESS; + } + } return TSDB_CODE_SUCCESS; } -int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, SSchedulerStatusRsp **rsp) { - SQWorkerSchTaskStatus *schStatus = taosHashGet(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId)); - if (NULL == schStatus) { - qError("no scheduler for schedulerId[%"PRIx64"]", schedulerId); - return TSDB_CODE_QRY_APP_ERROR; + +static FORCE_INLINE void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) { + QW_UNLOCK(rwType, &mgmt->schLock); +} + +static FORCE_INLINE int32_t qwAcquireTask(int32_t rwType, SQWorkerSchStatus *sch, uint64_t queryId, uint64_t taskId, SQWorkerTaskStatus **task) { + char id[sizeof(queryId) + sizeof(taskId)] = {0}; + QW_SET_QTID(id, queryId, taskId); + + QW_LOCK(rwType, &sch->tasksLock); + *task = taosHashGet(sch->tasksHash, id, sizeof(id)); + if (NULL == (*task)) { + QW_UNLOCK(rwType, &sch->tasksLock); + return TSDB_CODE_QRY_TASK_NOT_EXIST; } - schStatus->lastAccessTs = taosGetTimestampSec(); + return TSDB_CODE_SUCCESS; +} - int32_t i = 0; - int32_t taskNum = taosHashGetSize(schStatus->taskStatus); +static FORCE_INLINE int32_t qwInsertAndAcquireTask(int32_t rwType, SQWorkerSchStatus *sch, uint64_t queryId, uint64_t taskId, int8_t status, bool *inserted, SQWorkerTaskStatus **task) { + char id[sizeof(queryId) + sizeof(taskId)] = {0}; + QW_SET_QTID(id, queryId, taskId); + + while (true) { + *inserted = false; + + QW_LOCK(QW_WRITE, &sch->tasksLock); + int32_t code = taosHashPut(sch->tasksHash, id, sizeof(id), &status, sizeof(status)); + if (0 != code) { + QW_UNLOCK(QW_WRITE, &sch->tasksLock); + if (HASH_NODE_EXIST(code)) { + if (qwAcquireTask(rwType, sch, queryId, taskId, task)) { + continue; + } + + break; + } else { + qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", queryId, taskId); + return TSDB_CODE_QRY_APP_ERROR; + } + } + QW_UNLOCK(QW_WRITE, &sch->tasksLock); + + *inserted = true; + + if (TSDB_CODE_SUCCESS == qwAcquireTask(rwType, sch, queryId, taskId, task)) { + return TSDB_CODE_SUCCESS; + } + } + + return TSDB_CODE_SUCCESS; +} + + +static FORCE_INLINE void qwReleaseTask(int32_t rwType, SQWorkerSchStatus *sch) { + QW_UNLOCK(rwType, &sch->tasksLock); +} + +static FORCE_INLINE int32_t qwAcquireTaskResCache(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, SQWorkerResCache **res) { + char id[sizeof(queryId) + sizeof(taskId)] = {0}; + QW_SET_QTID(id, queryId, taskId); + + QW_LOCK(rwType, &mgmt->resLock); + *res = taosHashGet(mgmt->resHash, id, sizeof(id)); + if (NULL == (*res)) { + QW_UNLOCK(rwType, &mgmt->resLock); + return TSDB_CODE_QRY_RES_CACHE_NOT_EXIST; + } + + return TSDB_CODE_SUCCESS; +} + +static FORCE_INLINE void qwReleaseTaskResCache(int32_t rwType, SQWorkerMgmt *mgmt) { + QW_UNLOCK(rwType, &mgmt->resLock); +} + + +int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, SSchedulerStatusRsp **rsp) { + SQWorkerSchStatus *schStatus = NULL; + int32_t taskNum = 0; + + if (qwAcquireScheduler(QW_READ, mgmt, schedulerId, &schStatus)) { + qWarn("no scheduler for schedulerId[%"PRIx64"]", schedulerId); + } else { + schStatus->lastAccessTs = taosGetTimestampSec(); + + QW_LOCK(QW_READ, &schStatus->tasksLock); + taskNum = taosHashGetSize(schStatus->tasksHash); + } + int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum; *rsp = calloc(1, size); if (NULL == *rsp) { qError("calloc %d failed", size); + if (schStatus) { + QW_UNLOCK(QW_READ, &schStatus->tasksLock); + qwReleaseScheduler(QW_READ, mgmt); + } + return TSDB_CODE_QRY_OUT_OF_MEMORY; } void *key = NULL; size_t keyLen = 0; - void *pIter = taosHashIterate(schStatus->taskStatus, NULL); - while (pIter) { - SQWorkerTaskStatus *taskStatus = (SQWorkerTaskStatus *)pIter; - taosHashGetKey(pIter, &key, &keyLen); + int32_t i = 0; - QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId); - (*rsp)->status[i].status = taskStatus->status; - - pIter = taosHashIterate(schStatus->taskStatus, pIter); - } + if (schStatus) { + void *pIter = taosHashIterate(schStatus->tasksHash, NULL); + while (pIter) { + SQWorkerTaskStatus *taskStatus = (SQWorkerTaskStatus *)pIter; + taosHashGetKey(pIter, &key, &keyLen); + + QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId); + (*rsp)->status[i].status = taskStatus->status; + + pIter = taosHashIterate(schStatus->tasksHash, pIter); + } + } + + if (schStatus) { + QW_UNLOCK(QW_READ, &schStatus->tasksLock); + qwReleaseScheduler(QW_READ, mgmt); + } (*rsp)->num = taskNum; return TSDB_CODE_SUCCESS; } -int32_t qwBuildRspMsg(void *data, int32_t msgType); + + +int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t schedulerId) { + SQWorkerSchStatus *schStatus = NULL; + + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &schStatus)); + + schStatus->lastAccessTs = taosGetTimestampSec(); + + qwReleaseScheduler(QW_READ, mgmt); + + return TSDB_CODE_SUCCESS; +} + + +int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, int8_t *taskStatus) { + SQWorkerSchStatus *sch = NULL; + SQWorkerTaskStatus *task = NULL; + int32_t code = 0; + + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); + + QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); + + *taskStatus = task->status; + +_return: + if (task) { + qwReleaseTask(QW_READ, sch); + } + + if (sch) { + qwReleaseScheduler(QW_READ, mgmt); + } + + QW_RET(code); +} + + +int32_t qwSwitchTaskStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, int8_t taskStatus) { + SQWorkerSchStatus *sch = NULL; + SQWorkerTaskStatus *task = NULL; + int32_t code = 0; + bool inserted = false; + + if (qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)) { + if (qwCheckStatusSwitch(JOB_TASK_STATUS_NULL, taskStatus)) { + qError("switch status error, not start to %d", taskStatus); + QW_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + QW_ERR_RET(qwInsertAndAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); + } + + if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) { + if (qwCheckStatusSwitch(JOB_TASK_STATUS_NOT_START, taskStatus)) { + qwReleaseScheduler(QW_READ, mgmt); + qError("switch status error, not start to %d", taskStatus); + QW_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + QW_ERR_JRET(qwInsertAndAcquireTask(QW_READ, sch, queryId, taskId, taskStatus, &inserted, &task)); + + if (inserted) { + qwReleaseTask(QW_READ, sch); + qwReleaseScheduler(QW_READ, mgmt); + return TSDB_CODE_SUCCESS; + } + + QW_LOCK(QW_WRITE, &task->lock); + code = qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &taskStatus); + QW_UNLOCK(QW_WRITE, &task->lock); + + qwReleaseTask(QW_READ, sch); + qwReleaseScheduler(QW_READ, mgmt); + + QW_RET(code); + } + + QW_LOCK(QW_WRITE, &task->lock); + code = qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &taskStatus); + QW_UNLOCK(QW_WRITE, &task->lock); + +_return: + + qwReleaseTask(QW_READ, sch); + qwReleaseScheduler(QW_READ, mgmt); + + QW_RET(code); +} + + +int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId) { + SQWorkerSchStatus *sch = NULL; + SQWorkerTaskStatus *task = NULL; + int32_t code = 0; + + if (TSDB_CODE_SUCCESS != qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)) { + QW_ERR_RET(qwSwitchTaskStatus(mgmt, schedulerId, queryId, taskId, JOB_TASK_STATUS_NOT_START)); + + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); + } + + if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) { + code = qwSwitchTaskStatus(mgmt, schedulerId, queryId, taskId, JOB_TASK_STATUS_NOT_START); + if (code) { + qwReleaseScheduler(QW_READ, mgmt); + QW_ERR_RET(code); + } + + QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); + } + + QW_LOCK(QW_WRITE, &task->lock); + + task->cancel = true; + + int8_t oriStatus = task->status; + int8_t newStatus = 0; + + if (task->status == JOB_TASK_STATUS_CANCELLED || task->status == JOB_TASK_STATUS_NOT_START || task->status == JOB_TASK_STATUS_CANCELLING || task->status == JOB_TASK_STATUS_DROPPING) { + QW_UNLOCK(QW_WRITE, &task->lock); + + qwReleaseTask(QW_READ, sch); + qwReleaseScheduler(QW_READ, mgmt); + + return TSDB_CODE_SUCCESS; + } else if (task->status == JOB_TASK_STATUS_FAILED || task->status == JOB_TASK_STATUS_SUCCEED || task->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { + newStatus = JOB_TASK_STATUS_CANCELLED; + QW_ERR_JRET(qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus)); + } else { + newStatus = JOB_TASK_STATUS_CANCELLING; + QW_ERR_JRET(qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus)); + } + + QW_UNLOCK(QW_WRITE, &task->lock); + qwReleaseTask(QW_READ, sch); + qwReleaseScheduler(QW_READ, mgmt); + + if (oriStatus == JOB_TASK_STATUS_EXECUTING) { + //TODO call executer to cancel subquery async + } + + return TSDB_CODE_SUCCESS; + +_return: + + if (task) { + QW_UNLOCK(QW_WRITE, &task->lock); + + qwReleaseTask(QW_READ, sch); + } + + if (sch) { + qwReleaseScheduler(QW_READ, mgmt); + } + + QW_RET(code); +} + + + +int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId) { + SQWorkerSchStatus *sch = NULL; + SQWorkerTaskStatus *task = NULL; + int32_t code = 0; + char id[sizeof(queryId) + sizeof(taskId)] = {0}; + QW_SET_QTID(id, queryId, taskId); + + QW_LOCK(QW_WRITE, &mgmt->resLock); + if (mgmt->resHash) { + taosHashRemove(mgmt->resHash, id, sizeof(id)); + } + QW_UNLOCK(QW_WRITE, &mgmt->resLock); + + if (TSDB_CODE_SUCCESS != qwAcquireScheduler(QW_WRITE, mgmt, schedulerId, &sch)) { + qWarn("scheduler %"PRIx64" doesn't exist", schedulerId); + return TSDB_CODE_SUCCESS; + } + + if (qwAcquireTask(QW_WRITE, sch, queryId, taskId, &task)) { + qwReleaseScheduler(QW_WRITE, mgmt); + + qWarn("scheduler %"PRIx64" queryId %"PRIx64" taskId:%"PRIx64" doesn't exist", schedulerId, queryId, taskId); + return TSDB_CODE_SUCCESS; + } + + taosHashRemove(sch->tasksHash, id, sizeof(id)); + + qwReleaseTask(QW_WRITE, sch); + qwReleaseScheduler(QW_WRITE, mgmt); + + return TSDB_CODE_SUCCESS; +} + + +int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId) { + SQWorkerSchStatus *sch = NULL; + SQWorkerTaskStatus *task = NULL; + int32_t code = 0; + + if (TSDB_CODE_SUCCESS != qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)) { + qWarn("scheduler %"PRIx64" doesn't exist", schedulerId); + return TSDB_CODE_SUCCESS; + } + + if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) { + qwReleaseScheduler(QW_READ, mgmt); + + qWarn("scheduler %"PRIx64" queryId %"PRIx64" taskId:%"PRIx64" doesn't exist", schedulerId, queryId, taskId); + return TSDB_CODE_SUCCESS; + } + + QW_LOCK(QW_WRITE, &task->lock); + + task->drop = true; + + int8_t oriStatus = task->status; + int8_t newStatus = 0; + + if (task->status == JOB_TASK_STATUS_EXECUTING) { + newStatus = JOB_TASK_STATUS_CANCELLING; + QW_ERR_JRET(qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus)); + } else if (task->status == JOB_TASK_STATUS_CANCELLING || task->status == JOB_TASK_STATUS_DROPPING || task->status == JOB_TASK_STATUS_NOT_START) { + QW_UNLOCK(QW_WRITE, &task->lock); + qwReleaseTask(QW_READ, sch); + qwReleaseScheduler(QW_READ, mgmt); + + return TSDB_CODE_SUCCESS; + } else { + QW_UNLOCK(QW_WRITE, &task->lock); + qwReleaseTask(QW_READ, sch); + qwReleaseScheduler(QW_READ, mgmt); + + QW_ERR_RET(qwDropTask(mgmt, schedulerId, queryId, taskId)); + return TSDB_CODE_SUCCESS; + } + + QW_UNLOCK(QW_WRITE, &task->lock); + qwReleaseTask(QW_READ, sch); + qwReleaseScheduler(QW_READ, mgmt); + + if (oriStatus == JOB_TASK_STATUS_EXECUTING) { + //TODO call executer to cancel subquery async + } + + return TSDB_CODE_SUCCESS; + +_return: + + if (task) { + QW_UNLOCK(QW_WRITE, &task->lock); + + qwReleaseTask(QW_READ, sch); + } + + if (sch) { + qwReleaseScheduler(QW_READ, mgmt); + } + + QW_RET(code); +} + + + +int32_t qwBuildAndSendQueryRsp(SRpcMsg *pMsg, int32_t code) { + +} + +int32_t qwBuildAndSendReadyRsp(SRpcMsg *pMsg, int32_t code) { + +} + +int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) { + +} + +int32_t qwBuildAndSendFetchRsp(SRpcMsg *pMsg, void *data) { + +} + + +int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg) { + +} + +int32_t qwBuildAndSendDropRsp(SRpcMsg *pMsg) { + +} + + + +int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg, int32_t rspCode) { + SQWorkerSchStatus *sch = NULL; + SQWorkerTaskStatus *task = NULL; + int32_t code = 0; + + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); + + QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); + + QW_LOCK(QW_WRITE, &task->lock); + + if (QW_READY_NOT_RECEIVED == task->ready) { + QW_UNLOCK(QW_WRITE, &task->lock); + + qwReleaseTask(QW_READ, sch); + qwReleaseScheduler(QW_READ, mgmt); + + return TSDB_CODE_SUCCESS; + } else if (QW_READY_RECEIVED == task->ready) { + QW_ERR_JRET(qwBuildAndSendReadyRsp(pMsg, rspCode)); + + task->ready = QW_READY_RESPONSED; + } else if (QW_READY_RESPONSED == task->ready) { + qError("query response already send"); + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } else { + assert(0); + } + +_return: + + if (task) { + QW_UNLOCK(QW_WRITE, &task->lock); + } + + if (sch) { + qwReleaseTask(QW_READ, sch); + } + + qwReleaseScheduler(QW_READ, mgmt); + + QW_RET(code); +} + +int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { + SQWorkerSchStatus *sch = NULL; + SQWorkerTaskStatus *task = NULL; + int32_t code = 0; + + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); + + QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); + + QW_LOCK(QW_WRITE, &task->lock); + if (QW_TASK_READY_RESP(task->status)) { + QW_ERR_JRET(qwBuildAndSendReadyRsp(pMsg, task->code)); + + task->ready = QW_READY_RESPONSED; + } else { + task->ready = QW_READY_RECEIVED; + QW_UNLOCK(QW_WRITE, &task->lock); + + qwReleaseTask(QW_READ, sch); + qwReleaseScheduler(QW_READ, mgmt); + + return TSDB_CODE_SUCCESS; + } + +_return: + + if (task) { + QW_UNLOCK(QW_WRITE, &task->lock); + } + + if (sch) { + qwReleaseTask(QW_READ, sch); + } + + qwReleaseScheduler(QW_READ, mgmt); + + QW_RET(code); +} + +int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, bool *needStop) { + SQWorkerSchStatus *sch = NULL; + SQWorkerTaskStatus *task = NULL; + int32_t code = 0; + int8_t status = JOB_TASK_STATUS_CANCELLED; + + *needStop = false; + + if (qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)) { + return TSDB_CODE_SUCCESS; + } + + if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) { + qwReleaseScheduler(QW_READ, mgmt); + return TSDB_CODE_SUCCESS; + } + + QW_LOCK(QW_READ, &task->lock); + + if ((!task->cancel) && (!task->drop)) { + QW_UNLOCK(QW_READ, &task->lock); + qwReleaseTask(QW_READ, sch); + qwReleaseScheduler(QW_READ, mgmt); + + return TSDB_CODE_SUCCESS; + } + + QW_UNLOCK(QW_READ, &task->lock); + + *needStop = true; + + if (task->cancel) { + QW_LOCK(QW_WRITE, &task->lock); + qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status); + QW_UNLOCK(QW_WRITE, &task->lock); + } else if (task->drop) { + qwReleaseTask(QW_READ, sch); + qwReleaseScheduler(QW_READ, mgmt); + + qwDropTask(mgmt, schedulerId, queryId, taskId); + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { + SQWorkerSchStatus *sch = NULL; + SQWorkerTaskStatus *task = NULL; + int32_t code = 0; + + QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); + QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); + + QW_LOCK(QW_READ, &task->lock); + + if (task->status != JOB_TASK_STATUS_EXECUTING && task->status != JOB_TASK_STATUS_PARTIAL_SUCCEED && task->status != JOB_TASK_STATUS_SUCCEED) { + qError("invalid status %d for fetch", task->status); + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + if (QW_GOT_RES_DATA(res->data)) { + QW_ERR_JRET(qwBuildAndSendFetchRsp(pMsg, res->data)); + if (QW_LOW_RES_DATA(res->data)) { + if (task->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { + //TODO add query back to queue + } + } + } else { + if (task->status != JOB_TASK_STATUS_EXECUTING) { + qError("invalid status %d for fetch without res", task->status); + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + //TODO SET FLAG FOR QUERY TO SEND RSP WHEN RES READY + } + +_return: + if (task) { + QW_UNLOCK(QW_READ, &task->lock); + } + + if (sch) { + qwReleaseTask(QW_READ, sch); + } + + qwReleaseScheduler(QW_READ, mgmt); + + QW_RET(code); +} + +int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, int8_t status, int32_t errCode) { + SQWorkerSchStatus *sch = NULL; + SQWorkerTaskStatus *task = NULL; + int32_t code = 0; + int8_t newStatus = JOB_TASK_STATUS_CANCELLED; + + code = qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch); + if (code) { + qError("schedulerId:%"PRIx64" not in cache", schedulerId); + QW_ERR_RET(code); + } + + code = qwAcquireTask(QW_READ, sch, queryId, taskId, &task); + if (code) { + qwReleaseScheduler(QW_READ, mgmt); + qError("schedulerId:%"PRIx64" queryId:%"PRIx64" taskId:%"PRIx64" not in cache", schedulerId, queryId, taskId); + QW_ERR_RET(code); + } + + if (task->cancel) { + QW_LOCK(QW_WRITE, &task->lock); + qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus); + QW_UNLOCK(QW_WRITE, &task->lock); + } else if (task->drop) { + qwReleaseTask(QW_READ, sch); + qwReleaseScheduler(QW_READ, mgmt); + + qwDropTask(mgmt, schedulerId, queryId, taskId); + + return TSDB_CODE_SUCCESS; + } else { + QW_LOCK(QW_WRITE, &task->lock); + qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status); + task->code = errCode; + QW_UNLOCK(QW_WRITE, &task->lock); + } + + qwReleaseTask(QW_READ, sch); + qwReleaseScheduler(QW_READ, mgmt); + + return TSDB_CODE_SUCCESS; +} int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) { SQWorkerMgmt *mgmt = calloc(1, sizeof(SQWorkerMgmt)); if (NULL == mgmt) { qError("calloc %d failed", (int32_t)sizeof(SQWorkerMgmt)); - return TSDB_CODE_QRY_OUT_OF_MEMORY; + QW_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } if (cfg) { @@ -167,16 +812,16 @@ int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) { mgmt->cfg.maxSchTaskNum = QWORKER_DEFAULT_SCH_TASK_NUMBER; } - mgmt->scheduleHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); - if (NULL == mgmt->scheduleHash) { + mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); + if (NULL == mgmt->schHash) { tfree(mgmt); QW_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler hash failed", mgmt->cfg.maxSchedulerNum); } - mgmt->resHash = taosHashInit(mgmt->cfg.maxResCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + mgmt->resHash = taosHashInit(mgmt->cfg.maxResCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); if (NULL == mgmt->resHash) { - taosHashCleanup(mgmt->scheduleHash); - mgmt->scheduleHash = NULL; + taosHashCleanup(mgmt->schHash); + mgmt->schHash = NULL; tfree(mgmt); QW_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d res cache hash failed", mgmt->cfg.maxResCacheNum); @@ -187,99 +832,179 @@ int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) { return TSDB_CODE_SUCCESS; } -int32_t qWorkerProcessQueryMsg(void *qWorkerMgmt, SSchedulerQueryMsg *msg, SRpcMsg *rsp) { - if (NULL == qWorkerMgmt || NULL == msg || NULL == rsp) { - return TSDB_CODE_QRY_INVALID_INPUT; +int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg **rsp) { + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) { + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + SSubQueryMsg *msg = pMsg->pCont; + if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + bool queryDone = false; + bool queryRsp = false; + bool needStop = false; SSubplan *plan = NULL; - SQWorkerTaskStatus *tStatus = NULL; + int32_t code = 0; - int32_t code = qStringToSubplan(msg->msg, &plan); + QW_ERR_JRET(qwCheckTaskCancelDrop(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, &needStop)); + if (needStop) { + qWarn("task need stop"); + QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED); + } + + code = qStringToSubplan(msg->msg, &plan); if (TSDB_CODE_SUCCESS != code) { qError("schId:%"PRIx64",qId:%"PRIx64",taskId:%"PRIx64" string to subplan failed, code:%d", msg->schedulerId, msg->queryId, msg->taskId, code); - return code; + QW_ERR_JRET(code); } - - //TODO call executer to init subquery - QW_ERR_JRET(qwAddTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, JOB_TASK_STATUS_EXECUTING)); + //TODO call executer to init subquery + code = 0; // return error directly + //TODO call executer to init subquery + + if (code) { + QW_ERR_JRET(code); + } else { + QW_ERR_JRET(qwSwitchTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, JOB_TASK_STATUS_EXECUTING)); + } - QW_ERR_JRET(qwBuildRspMsg(NULL, TSDB_MSG_TYPE_QUERY_RSP)); + QW_ERR_JRET(qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_SUCCESS)); + queryRsp = true; + //TODO call executer to execute subquery - code = 0; + code = 0; void *data = NULL; + queryDone = false; //TODO call executer to execute subquery - QW_ERR_JRET(qwGetTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, &tStatus)); - - tStatus->status = (code) ? JOB_TASK_STATUS_FAILED : JOB_TASK_STATUS_SUCCEED; + if (code) { + QW_ERR_JRET(code); + } else { + QW_ERR_JRET(qwAddTaskResult(qWorkerMgmt, msg->queryId, msg->taskId, data)); - QW_ERR_JRET(qwAddTaskResult(qWorkerMgmt, msg->queryId, msg->taskId, data)); + QW_ERR_JRET(qwSwitchTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, JOB_TASK_STATUS_PARTIAL_SUCCEED)); + } _return: - if (tStatus && QW_TASK_DONE(tStatus->status) && QW_READY_RECEIVED == tStatus->ready) { - QW_ERR_RET(qwBuildRspMsg(NULL, TSDB_MSG_TYPE_RES_READY_RSP)); + if (queryRsp) { + code = qwCheckAndSendReadyRsp(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, pMsg, code); + } else { + code = qwBuildAndSendQueryRsp(pMsg, code); } + + int8_t status = 0; + if (TSDB_CODE_SUCCESS != code || queryDone) { + if (code) { + status = JOB_TASK_STATUS_FAILED; //TODO set CANCELLED from code + } else { + status = JOB_TASK_STATUS_SUCCEED; + } - qDestroySubplan(plan); + qwQueryPostProcess(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, status, code); + } - return code; + QW_RET(code); } -int32_t qWorkerProcessReadyMsg(void *qWorkerMgmt, SSchedulerReadyMsg *msg, SRpcMsg *rsp){ - if (NULL == qWorkerMgmt || NULL == msg || NULL == rsp) { +int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp){ + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) { return TSDB_CODE_QRY_INVALID_INPUT; } - SQWorkerTaskStatus *tStatus = NULL; - - QW_ERR_RET(qwGetTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, &tStatus)); - - if (QW_TASK_DONE(tStatus->status)) { - QW_ERR_RET(qwBuildRspMsg(tStatus, TSDB_MSG_TYPE_RES_READY_RSP)); - } else { - tStatus->ready = QW_READY_RECEIVED; - - return TSDB_CODE_SUCCESS; - } + SResReadyMsg *msg = pMsg->pCont; + if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } - tStatus->ready = QW_READY_RESPONSED; + QW_ERR_RET(qwSetAndSendReadyRsp(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, pMsg)); return TSDB_CODE_SUCCESS; } -int32_t qWorkerProcessStatusMsg(void *qWorkerMgmt, SSchedulerStatusMsg *msg, SRpcMsg *rsp) { - if (NULL == qWorkerMgmt || NULL == msg || NULL == rsp) { +int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp) { + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) { return TSDB_CODE_QRY_INVALID_INPUT; } + SSchTasksStatusMsg *msg = pMsg->pCont; + if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + SSchedulerStatusRsp *sStatus = NULL; QW_ERR_RET(qwGetSchTasksStatus(qWorkerMgmt, msg->schedulerId, &sStatus)); + QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus)); + return TSDB_CODE_SUCCESS; } -int32_t qWorkerProcessFetchMsg(void *qWorkerMgmt, SSchedulerFetchMsg *msg, SRpcMsg *rsp) { - if (NULL == qWorkerMgmt || NULL == msg || NULL == rsp) { +int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp) { + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) { return TSDB_CODE_QRY_INVALID_INPUT; } + SResFetchMsg *msg = pMsg->pCont; + if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + QW_ERR_RET(qwUpdateSchLastAccess(qWorkerMgmt, msg->schedulerId)); void *data = NULL; + SQWorkerResCache *res = NULL; + int32_t code = 0; - QW_ERR_RET(qwGetTaskResult(qWorkerMgmt, msg->queryId, msg->taskId, &data)); + QW_ERR_RET(qwAcquireTaskResCache(QW_READ, qWorkerMgmt, msg->queryId, msg->taskId, &res)); + + QW_ERR_JRET(qwHandleFetch(res, qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, pMsg)); + +_return: + + qwReleaseTaskResCache(QW_READ, qWorkerMgmt); + + QW_RET(code); +} + +int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp) { + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) { + return TSDB_CODE_QRY_INVALID_INPUT; + } + + STaskCancelMsg *msg = pMsg->pCont; + if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + QW_ERR_RET(qwCancelTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId)); + + QW_ERR_RET(qwBuildAndSendCancelRsp(pMsg)); + + return TSDB_CODE_SUCCESS; +} + +int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp) { + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) { + return TSDB_CODE_QRY_INVALID_INPUT; + } + + STaskDropMsg *msg = pMsg->pCont; + if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + QW_ERR_RET(qwCancelDropTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId)); - QW_ERR_RET(qwBuildRspMsg(data, TSDB_MSG_TYPE_FETCH_RSP)); + QW_ERR_RET(qwBuildAndSendDropRsp(pMsg)); return TSDB_CODE_SUCCESS; } -int32_t qWorkerProcessCancelMsg(void *qWorkerMgmt, SSchedulerCancelMsg *msg, SRpcMsg *rsp); void qWorkerDestroy(void **qWorkerMgmt) { if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 99a9b06fe4..6020fbaa4d 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -296,44 +296,45 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { } int32_t len = strlen(task->msg); - msgSize = sizeof(SSchedulerQueryMsg) + len; + msgSize = sizeof(SSubQueryMsg) + len + 1; msg = calloc(1, msgSize); if (NULL == msg) { qError("calloc %d failed", msgSize); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SSchedulerQueryMsg *pMsg = msg; + SSubQueryMsg *pMsg = msg; pMsg->schedulerId = htobe64(schMgmt.schedulerId); pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); pMsg->contentLen = htonl(len); memcpy(pMsg->msg, task->msg, len); + pMsg->msg[len] = 0; break; } case TSDB_MSG_TYPE_RES_READY: { - msgSize = sizeof(SSchedulerReadyMsg); + msgSize = sizeof(SResReadyMsg); msg = calloc(1, msgSize); if (NULL == msg) { qError("calloc %d failed", msgSize); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SSchedulerReadyMsg *pMsg = msg; + SResReadyMsg *pMsg = msg; pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); break; } case TSDB_MSG_TYPE_FETCH: { - msgSize = sizeof(SSchedulerFetchMsg); + msgSize = sizeof(SResFetchMsg); msg = calloc(1, msgSize); if (NULL == msg) { qError("calloc %d failed", msgSize); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SSchedulerFetchMsg *pMsg = msg; + SResFetchMsg *pMsg = msg; pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); break; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 43ac760643..65057501b9 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -323,7 +323,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INCONSISTAN, "File inconsistance in TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_TIME_CONDITION, "One valid time range condition expected") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SYS_ERROR, "System error") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_INPUT, "invalid input") - +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_SCH_NOT_EXIST, "Scheduler not exist") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_NOT_EXIST, "Task not exist") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_ALREADY_EXIST, "Task already exist") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST, "Task result cache not exist") // grant TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired") diff --git a/source/util/src/thash.c b/source/util/src/thash.c index c4f6f78106..cfe14f00e1 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -291,7 +291,7 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da // enable resize __rd_unlock(&pHashObj->lock, pHashObj->type); - return pHashObj->enableUpdate ? 0 : -1; + return pHashObj->enableUpdate ? 0 : -2; } } -- GitLab