diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 8229cae281af36b186da5238da0b36804de45c98..a8281d95a5e5b07cd8958457f53efc8f8ec96e1b 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -51,7 +51,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_SET_CUR, "mq-set-cur" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_RSP_READY, "rsp-ready" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_RES_READY, "res-ready" ) // message from client to mnode TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" ) @@ -1094,6 +1094,7 @@ typedef struct { } SUpdateTagValRsp; typedef struct SSchedulerQueryMsg { + uint64_t schedulerId; uint64_t queryId; uint64_t taskId; uint32_t contentLen; @@ -1101,15 +1102,39 @@ typedef struct SSchedulerQueryMsg { } SSchedulerQueryMsg; typedef struct SSchedulerReadyMsg { + uint64_t schedulerId; uint64_t queryId; uint64_t taskId; } SSchedulerReadyMsg; typedef struct SSchedulerFetchMsg { + uint64_t schedulerId; uint64_t queryId; uint64_t taskId; } SSchedulerFetchMsg; +typedef struct SSchedulerStatusMsg { + uint64_t schedulerId; +} SSchedulerStatusMsg; + +typedef struct STaskStatus { + uint64_t queryId; + uint64_t taskId; + int8_t status; +} STaskStatus; + +typedef struct SSchedulerStatusRsp { + uint32_t num; + STaskStatus status[]; +} SSchedulerStatusRsp; + + +typedef struct SSchedulerCancelMsg { + uint64_t schedulerId; + uint64_t queryId; + uint64_t taskId; +} SSchedulerCancelMsg; + #pragma pack(pop) diff --git a/include/dnode/qnode/qnode.h b/include/dnode/qnode/qnode.h index 65779b209938bc9766f2d0d11881c2354d0c5092..aa4c3af392dfb2e94198d0062801fe405624752c 100644 --- a/include/dnode/qnode/qnode.h +++ b/include/dnode/qnode/qnode.h @@ -19,7 +19,7 @@ #ifdef __cplusplus extern "C" { #endif - +#include "trpc.h" typedef struct { uint64_t numOfStartTask; @@ -32,48 +32,6 @@ typedef struct { uint64_t numOfErrors; } SQnodeStat; -/* start Task msg */ -typedef struct { - uint32_t schedulerIp; - uint16_t schedulerPort; - int64_t taskId; - int64_t queryId; - uint32_t srcIp; - uint16_t srcPort; -} SQnodeStartTaskMsg; - -/* stop Task msg */ -typedef struct { - int64_t taskId; -} SQnodeStopTaskMsg; - -/* start/stop Task msg response */ -typedef struct { - int64_t taskId; - int32_t code; -} SQnodeTaskRespMsg; - -/* Task status msg */ -typedef struct { - int64_t taskId; - int32_t status; - int64_t queryId; -} SQnodeTaskStatusMsg; - -/* Qnode/Scheduler heartbeat msg */ -typedef struct { - int32_t status; - int32_t load; - -} SQnodeHeartbeatMsg; - -/* Qnode sent/received msg */ -typedef struct { - int8_t msgType; - int32_t msgLen; - char msg[]; -} SQnodeMsg; - /** * Start one Qnode in Dnode. diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 254d572149b6c01a439cd9e23470ff90c5a65936..4fcbc1c5282a7bf7f1339381ed17be8399a76ad4 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -24,6 +24,15 @@ extern "C" { #include "thash.h" #include "tlog.h" +enum { + JOB_TASK_STATUS_NOT_START = 1, + JOB_TASK_STATUS_EXECUTING, + JOB_TASK_STATUS_SUCCEED, + JOB_TASK_STATUS_FAILED, + JOB_TASK_STATUS_CANCELLING, + JOB_TASK_STATUS_CANCELLED +}; + typedef struct STableComInfo { uint8_t numOfTags; // the number of tags in schema uint8_t precision; // the number of precision diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h new file mode 100644 index 0000000000000000000000000000000000000000..63a6b6f89bee7100abd91426b979163a99cfd641 --- /dev/null +++ b/include/libs/qworker/qworker.h @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_QWORKER_H_ +#define _TD_QWORKER_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include "trpc.h" + +typedef struct SQWorkerCfg { + uint32_t maxSchedulerNum; + uint32_t maxResCacheNum; + uint32_t maxSchTaskNum; +} SQWorkerCfg; + +typedef struct { + uint64_t numOfStartTask; + uint64_t numOfStopTask; + uint64_t numOfRecvedFetch; + uint64_t numOfSentHb; + uint64_t numOfSentFetch; + uint64_t numOfTaskInQueue; + uint64_t numOfFetchInQueue; + uint64_t numOfErrors; +} SQWorkerStat; + + +int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt); + +int32_t qWorkerProcessQueryMsg(void *qWorkerMgmt, SSchedulerQueryMsg *msg, SRpcMsg *rsp); + +int32_t qWorkerProcessReadyMsg(void *qWorkerMgmt, SSchedulerReadyMsg *msg, SRpcMsg *rsp); + +int32_t qWorkerProcessStatusMsg(void *qWorkerMgmt, SSchedulerStatusMsg *msg, SRpcMsg *rsp); + +int32_t qWorkerProcessFetchMsg(void *qWorkerMgmt, SSchedulerFetchMsg *msg, SRpcMsg *rsp); + +int32_t qWorkerProcessCancelMsg(void *qWorkerMgmt, SSchedulerCancelMsg *msg, SRpcMsg *rsp); + +void qWorkerDestroy(void **qWorkerMgmt); + + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_QWORKER_H_*/ diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 2cbf26f8772927f67c3a882465cd66414be20a93..975b10353871f269abecb70d4974836d3e4a44b2 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -25,6 +25,7 @@ extern "C" { typedef struct SSchedulerCfg { int32_t clusterType; + int32_t maxJobNum; } SSchedulerCfg; typedef struct SQueryProfileSummary { diff --git a/include/util/thash.h b/include/util/thash.h index d5038802c079f3ed3cddcef6c30c219459fc9917..ebdc91f0546a7ef9de3801f4eabf13f18c2cdf4c 100644 --- a/include/util/thash.h +++ b/include/util/thash.h @@ -193,11 +193,10 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p); /** * Get the corresponding key information for a given data in hash table - * @param pHashObj * @param data * @return */ -int32_t taosHashGetKey(SHashObj *pHashObj, void *data, void** key, size_t* keyLen); +int32_t taosHashGetKey(void *data, void** key, size_t* keyLen); #ifdef __cplusplus } diff --git a/source/libs/CMakeLists.txt b/source/libs/CMakeLists.txt index 636cf0a9a84453458ec0d06b84231f926a4637b5..027532bbb1d1514604fc8277862e764cec13ec51 100644 --- a/source/libs/CMakeLists.txt +++ b/source/libs/CMakeLists.txt @@ -11,3 +11,4 @@ add_subdirectory(executor) add_subdirectory(planner) add_subdirectory(function) add_subdirectory(qcom) +add_subdirectory(qworker) diff --git a/source/libs/executor/src/tfilter.c b/source/libs/executor/src/tfilter.c index 48662f34436d05c9c2ef873bb52d6e1aa515f4f3..97dccb5c7b69405f91934362262e339985a5060d 100644 --- a/source/libs/executor/src/tfilter.c +++ b/source/libs/executor/src/tfilter.c @@ -1175,7 +1175,7 @@ int32_t filterAddGroupUnitFromNode(SFilterInfo *info, tExprNode* tree, SArray *g void* key = NULL; len = 0; - taosHashGetKey((SHashObj *)data, p, &key, &len); + taosHashGetKey(p, &key, &len); void *fdata = NULL; if (IS_VAR_DATA_TYPE(type)) { diff --git a/source/libs/qworker/CMakeLists.txt b/source/libs/qworker/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..4eafa50bdc200615391cfc2f364fc9c9c20430bd --- /dev/null +++ b/source/libs/qworker/CMakeLists.txt @@ -0,0 +1,12 @@ +aux_source_directory(src QWORKER_SRC) +add_library(qworker ${QWORKER_SRC}) +target_include_directories( + qworker + PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/qworker" + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" +) + +target_link_libraries( + qworker + PRIVATE os util transport planner qcom +) diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h new file mode 100644 index 0000000000000000000000000000000000000000..1adc09def41234a26d3557916eb40917e6e1b57e --- /dev/null +++ b/source/libs/qworker/inc/qworkerInt.h @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_QWORKER_INT_H_ +#define _TD_QWORKER_INT_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#define QWORKER_DEFAULT_SCHEDULER_NUMBER 10000 +#define QWORKER_DEFAULT_RES_CACHE_NUMBER 10000 +#define QWORKER_DEFAULT_SCH_TASK_NUMBER 10000 + +enum { + QW_READY_NOT_RECEIVED = 0, + QW_READY_RECEIVED, + QW_READY_RESPONSED, +}; + +typedef struct SQWorkerTaskStatus { + int8_t status; + int8_t ready; +} SQWorkerTaskStatus; + +typedef struct SQWorkerResCache { + void *data; +} SQWorkerResCache; + +typedef struct SQWorkerSchTaskStatus { + int32_t lastAccessTs; // timestamp in second + SHashObj *taskStatus; // key:queryId+taskId, value: SQWorkerTaskStatus +} SQWorkerSchTaskStatus; + +// Qnode/Vnode level task management +typedef struct SQWorkerMgmt { + SQWorkerCfg cfg; + SHashObj *scheduleHash; //key: schedulerId, value: SQWorkerSchTaskStatus + SHashObj *resHash; //key: queryId+taskId, value: SQWorkerResCache +} SQWorkerMgmt; + +#define QW_TASK_DONE(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == status == JOB_TASK_STATUS_CANCELLED) +#define QW_SET_QTID(id, qid, tid) do { *(uint64_t *)(id) = (qid); *(uint64_t *)((char *)(id) + sizeof(qid)) = (tid); } while (0) +#define QW_GET_QTID(id, qid, tid) do { (qid) = *(uint64_t *)(id); (tid) = *(uint64_t *)((char *)(id) + sizeof(qid)); } while (0) + +#define QW_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) +#define QW_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) +#define QW_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0) +#define QW_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) + + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_QWORKER_INT_H_*/ diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c new file mode 100644 index 0000000000000000000000000000000000000000..82bfd75b6a1d2ffb22a4c11b6f286b185d6a643a --- /dev/null +++ b/source/libs/qworker/src/qworker.c @@ -0,0 +1,298 @@ +#include "taosmsg.h" +#include "query.h" +#include "qworker.h" +#include "qworkerInt.h" +#include "planner.h" + +int32_t qwAddTaskStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, int8_t taskStatus) { + SQWorkerTaskStatus tStatus = {0}; + tStatus.status = taskStatus; + + char id[sizeof(queryId) + sizeof(taskId)] = {0}; + QW_SET_QTID(id, queryId, taskId); + + SQWorkerSchTaskStatus *schStatus = taosHashGet(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId)); + if (NULL == schStatus) { + SQWorkerSchTaskStatus newSchStatus = {0}; + newSchStatus.taskStatus = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (NULL == newSchStatus.taskStatus) { + qError("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum); + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + if (0 != taosHashPut(newSchStatus.taskStatus, id, sizeof(id), &tStatus, sizeof(tStatus))) { + qError("taosHashPut schedulerId[%"PRIx64"]queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", schedulerId, queryId, taskId); + taosHashCleanup(newSchStatus.taskStatus); + return TSDB_CODE_QRY_APP_ERROR; + } + + newSchStatus.lastAccessTs = taosGetTimestampSec(); + + if (0 != taosHashPut(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId), &newSchStatus, sizeof(newSchStatus))) { + qError("taosHashPut schedulerId[%"PRIx64"] to scheduleHash failed", schedulerId); + taosHashCleanup(newSchStatus.taskStatus); + return TSDB_CODE_QRY_APP_ERROR; + } + + return TSDB_CODE_SUCCESS; + } + + schStatus->lastAccessTs = taosGetTimestampSec(); + + if (0 != taosHashPut(schStatus->taskStatus, id, sizeof(id), &tStatus, sizeof(tStatus))) { + qError("taosHashPut schedulerId[%"PRIx64"]queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", schedulerId, queryId, taskId); + return TSDB_CODE_QRY_APP_ERROR; + } + + return TSDB_CODE_SUCCESS; +} + +int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, SQWorkerTaskStatus **taskStatus) { + SQWorkerSchTaskStatus *schStatus = taosHashGet(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId)); + if (NULL == schStatus) { + qError("no scheduler for schedulerId[%"PRIx64"]", schedulerId); + return TSDB_CODE_QRY_APP_ERROR; + } + + schStatus->lastAccessTs = taosGetTimestampSec(); + + char id[sizeof(queryId) + sizeof(taskId)] = {0}; + QW_SET_QTID(id, queryId, taskId); + + SQWorkerTaskStatus *tStatus = taosHashGet(schStatus->taskStatus, id, sizeof(id)); + if (NULL == tStatus) { + qError("no task status for schedulerId[%"PRIx64"] queryId[%"PRIx64"] taskId[%"PRIx64"]", schedulerId, queryId, taskId); + return TSDB_CODE_QRY_APP_ERROR; + } + + *taskStatus = tStatus; + + return TSDB_CODE_SUCCESS; +} + +int32_t qwAddTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, void *data) { + char id[sizeof(queryId) + sizeof(taskId)] = {0}; + QW_SET_QTID(id, queryId, taskId); + + SQWorkerResCache resCache = {0}; + resCache.data = data; + + if (0 != taosHashPut(mgmt->resHash, id, sizeof(id), &resCache, sizeof(SQWorkerResCache))) { + qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to resHash failed", queryId, taskId); + return TSDB_CODE_QRY_APP_ERROR; + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t qwGetTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, void **data) { + char id[sizeof(queryId) + sizeof(taskId)] = {0}; + QW_SET_QTID(id, queryId, taskId); + + SQWorkerResCache *resCache = taosHashGet(mgmt->resHash, id, sizeof(id)); + if (NULL == resCache) { + qError("no task res for queryId[%"PRIx64"] taskId[%"PRIx64"]", queryId, taskId); + return TSDB_CODE_QRY_APP_ERROR; + } + + *data = resCache->data; + + return TSDB_CODE_SUCCESS; +} + +int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t schedulerId) { + SQWorkerSchTaskStatus *schStatus = taosHashGet(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId)); + if (NULL == schStatus) { + qError("no scheduler for schedulerId[%"PRIx64"]", schedulerId); + return TSDB_CODE_QRY_APP_ERROR; + } + + schStatus->lastAccessTs = taosGetTimestampSec(); + + return TSDB_CODE_SUCCESS; +} + +int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, SSchedulerStatusRsp **rsp) { + SQWorkerSchTaskStatus *schStatus = taosHashGet(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId)); + if (NULL == schStatus) { + qError("no scheduler for schedulerId[%"PRIx64"]", schedulerId); + return TSDB_CODE_QRY_APP_ERROR; + } + + schStatus->lastAccessTs = taosGetTimestampSec(); + + int32_t i = 0; + int32_t taskNum = taosHashGetSize(schStatus->taskStatus); + int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum; + *rsp = calloc(1, size); + if (NULL == *rsp) { + qError("calloc %d failed", size); + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + void *key = NULL; + size_t keyLen = 0; + void *pIter = taosHashIterate(schStatus->taskStatus, NULL); + while (pIter) { + SQWorkerTaskStatus *taskStatus = (SQWorkerTaskStatus *)pIter; + taosHashGetKey(pIter, &key, &keyLen); + + QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId); + (*rsp)->status[i].status = taskStatus->status; + + pIter = taosHashIterate(schStatus->taskStatus, pIter); + } + + (*rsp)->num = taskNum; + + return TSDB_CODE_SUCCESS; +} + +int32_t qwBuildRspMsg(void *data, int32_t msgType); + + +int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) { + SQWorkerMgmt *mgmt = calloc(1, sizeof(SQWorkerMgmt)); + if (NULL == mgmt) { + qError("calloc %d failed", (int32_t)sizeof(SQWorkerMgmt)); + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + + if (cfg) { + mgmt->cfg = *cfg; + } else { + mgmt->cfg.maxSchedulerNum = QWORKER_DEFAULT_SCHEDULER_NUMBER; + mgmt->cfg.maxResCacheNum = QWORKER_DEFAULT_RES_CACHE_NUMBER; + mgmt->cfg.maxSchTaskNum = QWORKER_DEFAULT_SCH_TASK_NUMBER; + } + + mgmt->scheduleHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == mgmt->scheduleHash) { + tfree(mgmt); + QW_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler hash failed", mgmt->cfg.maxSchedulerNum); + } + + mgmt->resHash = taosHashInit(mgmt->cfg.maxResCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (NULL == mgmt->resHash) { + taosHashCleanup(mgmt->scheduleHash); + mgmt->scheduleHash = NULL; + tfree(mgmt); + + QW_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d res cache hash failed", mgmt->cfg.maxResCacheNum); + } + + *qWorkerMgmt = mgmt; + + return TSDB_CODE_SUCCESS; +} + +int32_t qWorkerProcessQueryMsg(void *qWorkerMgmt, SSchedulerQueryMsg *msg, SRpcMsg *rsp) { + if (NULL == qWorkerMgmt || NULL == msg || NULL == rsp) { + return TSDB_CODE_QRY_INVALID_INPUT; + } + + SSubplan *plan = NULL; + SQWorkerTaskStatus *tStatus = NULL; + + int32_t code = qStringToSubplan(msg->msg, &plan); + if (TSDB_CODE_SUCCESS != code) { + qError("schId:%"PRIx64",qId:%"PRIx64",taskId:%"PRIx64" string to subplan failed, code:%d", msg->schedulerId, msg->queryId, msg->taskId, code); + return code; + } + + //TODO call executer to init subquery + + QW_ERR_JRET(qwAddTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, JOB_TASK_STATUS_EXECUTING)); + + QW_ERR_JRET(qwBuildRspMsg(NULL, TSDB_MSG_TYPE_QUERY_RSP)); + + //TODO call executer to execute subquery + code = 0; + void *data = NULL; + //TODO call executer to execute subquery + + QW_ERR_JRET(qwGetTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, &tStatus)); + + tStatus->status = (code) ? JOB_TASK_STATUS_FAILED : JOB_TASK_STATUS_SUCCEED; + + QW_ERR_JRET(qwAddTaskResult(qWorkerMgmt, msg->queryId, msg->taskId, data)); + +_return: + + if (tStatus && QW_TASK_DONE(tStatus->status) && QW_READY_RECEIVED == tStatus->ready) { + QW_ERR_RET(qwBuildRspMsg(NULL, TSDB_MSG_TYPE_RES_READY_RSP)); + } + + qDestroySubplan(plan); + + return code; +} + +int32_t qWorkerProcessReadyMsg(void *qWorkerMgmt, SSchedulerReadyMsg *msg, SRpcMsg *rsp){ + if (NULL == qWorkerMgmt || NULL == msg || NULL == rsp) { + return TSDB_CODE_QRY_INVALID_INPUT; + } + + SQWorkerTaskStatus *tStatus = NULL; + + QW_ERR_RET(qwGetTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, &tStatus)); + + if (QW_TASK_DONE(tStatus->status)) { + QW_ERR_RET(qwBuildRspMsg(tStatus, TSDB_MSG_TYPE_RES_READY_RSP)); + } else { + tStatus->ready = QW_READY_RECEIVED; + + return TSDB_CODE_SUCCESS; + } + + tStatus->ready = QW_READY_RESPONSED; + + return TSDB_CODE_SUCCESS; +} + +int32_t qWorkerProcessStatusMsg(void *qWorkerMgmt, SSchedulerStatusMsg *msg, SRpcMsg *rsp) { + if (NULL == qWorkerMgmt || NULL == msg || NULL == rsp) { + return TSDB_CODE_QRY_INVALID_INPUT; + } + + SSchedulerStatusRsp *sStatus = NULL; + + QW_ERR_RET(qwGetSchTasksStatus(qWorkerMgmt, msg->schedulerId, &sStatus)); + + return TSDB_CODE_SUCCESS; +} + +int32_t qWorkerProcessFetchMsg(void *qWorkerMgmt, SSchedulerFetchMsg *msg, SRpcMsg *rsp) { + if (NULL == qWorkerMgmt || NULL == msg || NULL == rsp) { + return TSDB_CODE_QRY_INVALID_INPUT; + } + + QW_ERR_RET(qwUpdateSchLastAccess(qWorkerMgmt, msg->schedulerId)); + + void *data = NULL; + + QW_ERR_RET(qwGetTaskResult(qWorkerMgmt, msg->queryId, msg->taskId, &data)); + + QW_ERR_RET(qwBuildRspMsg(data, TSDB_MSG_TYPE_FETCH_RSP)); + + return TSDB_CODE_SUCCESS; +} + +int32_t qWorkerProcessCancelMsg(void *qWorkerMgmt, SSchedulerCancelMsg *msg, SRpcMsg *rsp); + +void qWorkerDestroy(void **qWorkerMgmt) { + if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) { + return; + } + + SQWorkerMgmt *mgmt = *qWorkerMgmt; + + //TODO STOP ALL QUERY + + //TODO FREE ALL + + tfree(*qWorkerMgmt); +} + + diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 73e7c4d24e1c2693d174dcb7acad4112939778c3..3fab91edacd77096fbacceee5df3f762452ad16d 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -31,17 +31,9 @@ extern "C" { #define SCH_MAX_CONDIDATE_EP_NUM TSDB_MAX_REPLICA -enum { - SCH_STATUS_NOT_START = 1, - SCH_STATUS_EXECUTING, - SCH_STATUS_SUCCEED, - SCH_STATUS_FAILED, - SCH_STATUS_CANCELLING, - SCH_STATUS_CANCELLED -}; - typedef struct SSchedulerMgmt { - uint64_t taskId; + uint64_t taskId; + uint64_t schedulerId; SSchedulerCfg cfg; SHashObj *Jobs; // key: queryId, value: SQueryJob* } SSchedulerMgmt; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 6014ff9ab6282248f52bfe1aba0535e5b4d4cb71..99a9b06fe441832d42fbc5bc86ad24bcc3ebc263 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -161,7 +161,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { SArray *levelPlans = NULL; int32_t levelPlanNum = 0; - level.status = SCH_STATUS_NOT_START; + level.status = JOB_TASK_STATUS_NOT_START; for (int32_t i = 0; i < levelNum; ++i) { level.level = i; @@ -191,7 +191,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); task.plan = plan; - task.status = SCH_STATUS_NOT_START; + task.status = JOB_TASK_STATUS_NOT_START; void *p = taosArrayPush(level.subTasks, &task); if (NULL == p) { @@ -304,13 +304,15 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { } SSchedulerQueryMsg *pMsg = msg; - pMsg->queryId = job->queryId; - pMsg->taskId = task->taskId; - pMsg->contentLen = len; + + pMsg->schedulerId = htobe64(schMgmt.schedulerId); + pMsg->queryId = htobe64(job->queryId); + pMsg->taskId = htobe64(task->taskId); + pMsg->contentLen = htonl(len); memcpy(pMsg->msg, task->msg, len); break; } - case TSDB_MSG_TYPE_RSP_READY: { + case TSDB_MSG_TYPE_RES_READY: { msgSize = sizeof(SSchedulerReadyMsg); msg = calloc(1, msgSize); if (NULL == msg) { @@ -319,8 +321,8 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { } SSchedulerReadyMsg *pMsg = msg; - pMsg->queryId = job->queryId; - pMsg->taskId = task->taskId; + pMsg->queryId = htobe64(job->queryId); + pMsg->taskId = htobe64(task->taskId); break; } case TSDB_MSG_TYPE_FETCH: { @@ -332,8 +334,8 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { } SSchedulerFetchMsg *pMsg = msg; - pMsg->queryId = job->queryId; - pMsg->taskId = task->taskId; + pMsg->queryId = htobe64(job->queryId); + pMsg->taskId = htobe64(task->taskId); break; } default: @@ -376,7 +378,7 @@ _return: int32_t schProcessOnJobSuccess(SQueryJob *job) { - job->status = SCH_STATUS_SUCCEED; + job->status = JOB_TASK_STATUS_SUCCEED; if (job->userFetch) { SCH_ERR_RET(schFetchFromRemote(job)); @@ -386,7 +388,7 @@ int32_t schProcessOnJobSuccess(SQueryJob *job) { } int32_t schProcessOnJobFailure(SQueryJob *job) { - job->status = SCH_STATUS_FAILED; + job->status = JOB_TASK_STATUS_FAILED; atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); @@ -413,7 +415,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { return TSDB_CODE_SUCCESS; } - task->status = SCH_STATUS_SUCCEED; + task->status = JOB_TASK_STATUS_SUCCEED; int32_t parentNum = (int32_t)taosArrayGetSize(task->parents); if (parentNum == 0) { @@ -459,7 +461,7 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod if (!needRetry) { SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode); - job->status = SCH_STATUS_FAILED; + job->status = JOB_TASK_STATUS_FAILED; SCH_ERR_RET(schProcessOnJobFailure(job)); return TSDB_CODE_SUCCESS; @@ -478,13 +480,13 @@ int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32 if (rspCode != TSDB_CODE_SUCCESS) { SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); } else { - code = schAsyncSendMsg(job, task, TSDB_MSG_TYPE_RSP_READY); + code = schAsyncSendMsg(job, task, TSDB_MSG_TYPE_RES_READY); if (code) { goto _task_error; } } break; - case TSDB_MSG_TYPE_RSP_READY: + case TSDB_MSG_TYPE_RES_READY: if (rspCode != TSDB_CODE_SUCCESS) { SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); } else { @@ -534,7 +536,7 @@ int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) { SCH_ERR_RET(schPushTaskToExecList(job, task)); - task->status = SCH_STATUS_EXECUTING; + task->status = JOB_TASK_STATUS_EXECUTING; return TSDB_CODE_SUCCESS; } @@ -546,22 +548,26 @@ int32_t schLaunchJob(SQueryJob *job) { SCH_ERR_RET(schLaunchTask(job, task)); } - job->status = SCH_STATUS_EXECUTING; + job->status = JOB_TASK_STATUS_EXECUTING; return TSDB_CODE_SUCCESS; } int32_t schedulerInit(SSchedulerCfg *cfg) { - schMgmt.Jobs = taosHashInit(SCHEDULE_DEFAULT_JOB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); - if (NULL == schMgmt.Jobs) { - SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", SCHEDULE_DEFAULT_JOB_NUMBER); - } - if (cfg) { schMgmt.cfg = *cfg; + } else { + schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER; } + schMgmt.Jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == schMgmt.Jobs) { + SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum); + } + + schMgmt.schedulerId = 1; //TODO GENERATE A UUID + return TSDB_CODE_SUCCESS; } @@ -605,7 +611,7 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } - job->status = SCH_STATUS_NOT_START; + job->status = JOB_TASK_STATUS_NOT_START; SCH_ERR_JRET(schLaunchJob(job)); @@ -634,7 +640,7 @@ int32_t scheduleFetchRows(void *pJob, void **data) { return TSDB_CODE_QRY_APP_ERROR; } - if (job->status == SCH_STATUS_SUCCEED) { + if (job->status == JOB_TASK_STATUS_SUCCEED) { SCH_ERR_JRET(schFetchFromRemote(job)); } @@ -668,7 +674,7 @@ void scheduleFreeJob(void *pJob) { return; } - if (job->status == SCH_STATUS_EXECUTING) { + if (job->status == JOB_TASK_STATUS_EXECUTING) { scheduleCancelJob(pJob); } } diff --git a/source/util/src/thash.c b/source/util/src/thash.c index bfc6b47fa6abd80245b021954ee908156e9eebeb..c4f6f78106e1ee6703e0429e705f62ab83db1881 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -776,9 +776,16 @@ size_t taosHashGetMemSize(const SHashObj *pHashObj) { return (pHashObj->capacity * (sizeof(SHashEntry) + POINTER_BYTES)) + sizeof(SHashNode) * taosHashGetSize(pHashObj) + sizeof(SHashObj); } -FORCE_INLINE void *taosHashGetDataKey(SHashObj *pHashObj, void *data) { +FORCE_INLINE int32_t taosHashGetKey(void *data, void** key, size_t* keyLen) { + if (NULL == data || NULL == key) { + return -1; + } + SHashNode * node = GET_HASH_PNODE(data); - return GET_HASH_NODE_KEY(node); + *key = GET_HASH_NODE_KEY(node); + *keyLen = node->keyLen; + + return 0; } FORCE_INLINE uint32_t taosHashGetDataKeyLen(SHashObj *pHashObj, void *data) {