diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h
index ee4af6041ea0c4f6a0f5ecc800c1edd798c84cb9..0efaf038de6c217a72e74dc7c025bd50c06215d7 100644
--- a/include/common/taosmsg.h
+++ b/include/common/taosmsg.h
@@ -51,7 +51,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_SET, "mq-set" )
-TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_RSP_READY, "rsp-ready" )
+TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_RES_READY, "res-ready" )
// message from client to mnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" )
@@ -1090,11 +1090,13 @@ typedef struct SSchedulerQueryMsg {
} SSchedulerQueryMsg;
typedef struct SSchedulerReadyMsg {
+ uint64_t schedulerId;
uint64_t queryId;
uint64_t taskId;
} SSchedulerReadyMsg;
typedef struct SSchedulerFetchMsg {
+ uint64_t schedulerId;
uint64_t queryId;
uint64_t taskId;
} SSchedulerFetchMsg;
@@ -1103,6 +1105,24 @@ typedef struct SSchedulerStatusMsg {
uint64_t schedulerId;
} SSchedulerStatusMsg;
+typedef struct STaskStatus {
+ uint64_t queryId;
+ uint64_t taskId;
+ int8_t status;
+} STaskStatus;
+
+typedef struct SSchedulerStatusRsp {
+ uint32_t num;
+ STaskStatus status[];
+} SSchedulerStatusRsp;
+
+
+typedef struct SSchedulerCancelMsg {
+ uint64_t schedulerId;
+ uint64_t queryId;
+ uint64_t taskId;
+} SSchedulerCancelMsg;
+
#pragma pack(pop)
diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h
index 254d572149b6c01a439cd9e23470ff90c5a65936..4fcbc1c5282a7bf7f1339381ed17be8399a76ad4 100644
--- a/include/libs/qcom/query.h
+++ b/include/libs/qcom/query.h
@@ -24,6 +24,15 @@ extern "C" {
#include "thash.h"
#include "tlog.h"
+enum {
+ JOB_TASK_STATUS_NOT_START = 1,
+ JOB_TASK_STATUS_EXECUTING,
+ JOB_TASK_STATUS_SUCCEED,
+ JOB_TASK_STATUS_FAILED,
+ JOB_TASK_STATUS_CANCELLING,
+ JOB_TASK_STATUS_CANCELLED
+};
+
typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema
uint8_t precision; // the number of precision
diff --git a/include/libs/qcom/queryWorker.h b/include/libs/qworker/qworker.h
similarity index 60%
rename from include/libs/qcom/queryWorker.h
rename to include/libs/qworker/qworker.h
index 9a3d63ae65c8a9410f69d10ccba024f17a924477..63a6b6f89bee7100abd91426b979163a99cfd641 100644
--- a/include/libs/qcom/queryWorker.h
+++ b/include/libs/qworker/qworker.h
@@ -13,16 +13,19 @@
* along with this program. If not, see .
*/
-#ifndef _TD_QUERY_WORKER_H_
-#define _TD_QUERY_WORKER_H_
+#ifndef _TD_QWORKER_H_
+#define _TD_QWORKER_H_
#ifdef __cplusplus
extern "C" {
#endif
+#include "trpc.h"
+
typedef struct SQWorkerCfg {
uint32_t maxSchedulerNum;
uint32_t maxResCacheNum;
+ uint32_t maxSchTaskNum;
} SQWorkerCfg;
typedef struct {
@@ -37,14 +40,23 @@ typedef struct {
} SQWorkerStat;
-int32_t qWorkerInit(SQWorkerCfg *cfg);
+int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt);
+
+int32_t qWorkerProcessQueryMsg(void *qWorkerMgmt, SSchedulerQueryMsg *msg, SRpcMsg *rsp);
+
+int32_t qWorkerProcessReadyMsg(void *qWorkerMgmt, SSchedulerReadyMsg *msg, SRpcMsg *rsp);
+
+int32_t qWorkerProcessStatusMsg(void *qWorkerMgmt, SSchedulerStatusMsg *msg, SRpcMsg *rsp);
+
+int32_t qWorkerProcessFetchMsg(void *qWorkerMgmt, SSchedulerFetchMsg *msg, SRpcMsg *rsp);
-int32_t qWorkerProcessQueryMsg(char *msg, int32_t msgLen, int32_t *code, char **rspMsg);
+int32_t qWorkerProcessCancelMsg(void *qWorkerMgmt, SSchedulerCancelMsg *msg, SRpcMsg *rsp);
+void qWorkerDestroy(void **qWorkerMgmt);
#ifdef __cplusplus
}
#endif
-#endif /*_TD_QUERY_WORKER_H_*/
+#endif /*_TD_QWORKER_H_*/
diff --git a/include/util/thash.h b/include/util/thash.h
index d5038802c079f3ed3cddcef6c30c219459fc9917..ebdc91f0546a7ef9de3801f4eabf13f18c2cdf4c 100644
--- a/include/util/thash.h
+++ b/include/util/thash.h
@@ -193,11 +193,10 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p);
/**
* Get the corresponding key information for a given data in hash table
- * @param pHashObj
* @param data
* @return
*/
-int32_t taosHashGetKey(SHashObj *pHashObj, void *data, void** key, size_t* keyLen);
+int32_t taosHashGetKey(void *data, void** key, size_t* keyLen);
#ifdef __cplusplus
}
diff --git a/source/libs/CMakeLists.txt b/source/libs/CMakeLists.txt
index 636cf0a9a84453458ec0d06b84231f926a4637b5..027532bbb1d1514604fc8277862e764cec13ec51 100644
--- a/source/libs/CMakeLists.txt
+++ b/source/libs/CMakeLists.txt
@@ -11,3 +11,4 @@ add_subdirectory(executor)
add_subdirectory(planner)
add_subdirectory(function)
add_subdirectory(qcom)
+add_subdirectory(qworker)
diff --git a/source/libs/executor/src/tfilter.c b/source/libs/executor/src/tfilter.c
index 48662f34436d05c9c2ef873bb52d6e1aa515f4f3..97dccb5c7b69405f91934362262e339985a5060d 100644
--- a/source/libs/executor/src/tfilter.c
+++ b/source/libs/executor/src/tfilter.c
@@ -1175,7 +1175,7 @@ int32_t filterAddGroupUnitFromNode(SFilterInfo *info, tExprNode* tree, SArray *g
void* key = NULL;
len = 0;
- taosHashGetKey((SHashObj *)data, p, &key, &len);
+ taosHashGetKey(p, &key, &len);
void *fdata = NULL;
if (IS_VAR_DATA_TYPE(type)) {
diff --git a/source/libs/qcom/inc/qWorkerInt.h b/source/libs/qcom/inc/qWorkerInt.h
deleted file mode 100644
index ce87e18c6aabf1dd64a52149a9860eb7e83dcf84..0000000000000000000000000000000000000000
--- a/source/libs/qcom/inc/qWorkerInt.h
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-#ifndef _TD_QWORKER_INT_H_
-#define _TD_QWORKER_INT_H_
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-#define QWORKER_DEFAULT_SCHEDULER_NUMBER 10000
-#define QWORKER_DEFAULT_RES_CACHE_NUMBER 10000
-
-typedef struct SQWorkerMgmt {
- SQWorkerCfg cfg;
- SHashObj *scheduleHash; //key: schedulerId, value:
- SHashObj *resHash; //key: queryId+taskId, value:
-} SQWorkerMgmt;
-
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif /*_TD_QWORKER_INT_H_*/
diff --git a/source/libs/qcom/src/queryWorker.c b/source/libs/qcom/src/queryWorker.c
deleted file mode 100644
index 580a340834e403a74bce6a4da26bc42c2a73b8e2..0000000000000000000000000000000000000000
--- a/source/libs/qcom/src/queryWorker.c
+++ /dev/null
@@ -1,38 +0,0 @@
-#include "queryInt.h"
-#include "query.h"
-#include "taosmsg.h"
-#include "queryWorker.h"
-#include "qWorkerInt.h"
-
-SQWorkerMgmt qWorkerMgmt = {0};
-
-int32_t qWorkerInit(SQWorkerCfg *cfg) {
- if (cfg) {
- qWorkerMgmt.cfg = *cfg;
- } else {
- qWorkerMgmt.cfg.maxSchedulerNum = QWORKER_DEFAULT_SCHEDULER_NUMBER;
- qWorkerMgmt.cfg.maxResCacheNum = QWORKER_DEFAULT_RES_CACHE_NUMBER;
- }
-
- qWorkerMgmt.scheduleHash = taosHashInit(qWorkerMgmt.cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
- if (NULL == qWorkerMgmt.scheduleHash) {
- SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler hash failed", qWorkerMgmt.cfg.maxSchedulerNum);
- }
-
- qWorkerMgmt.resHash = taosHashInit(qWorkerMgmt.cfg.maxResCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
- if (NULL == qWorkerMgmt.resHash) {
- taosHashCleanup(qWorkerMgmt.scheduleHash);
- qWorkerMgmt.scheduleHash = NULL;
-
- SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d res cache hash failed", qWorkerMgmt.cfg.maxResCacheNum);
- }
-
- return TSDB_CODE_SUCCESS;
-}
-
-int32_t qWorkerProcessQueryMsg(SSchedulerQueryMsg *msg, int32_t msgLen, int32_t *code, char **rspMsg) {
-
- // TODO
- return 0;
-}
-
diff --git a/source/libs/qworker/CMakeLists.txt b/source/libs/qworker/CMakeLists.txt
new file mode 100644
index 0000000000000000000000000000000000000000..4eafa50bdc200615391cfc2f364fc9c9c20430bd
--- /dev/null
+++ b/source/libs/qworker/CMakeLists.txt
@@ -0,0 +1,12 @@
+aux_source_directory(src QWORKER_SRC)
+add_library(qworker ${QWORKER_SRC})
+target_include_directories(
+ qworker
+ PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/qworker"
+ PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
+)
+
+target_link_libraries(
+ qworker
+ PRIVATE os util transport planner qcom
+)
diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h
new file mode 100644
index 0000000000000000000000000000000000000000..1adc09def41234a26d3557916eb40917e6e1b57e
--- /dev/null
+++ b/source/libs/qworker/inc/qworkerInt.h
@@ -0,0 +1,68 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#ifndef _TD_QWORKER_INT_H_
+#define _TD_QWORKER_INT_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define QWORKER_DEFAULT_SCHEDULER_NUMBER 10000
+#define QWORKER_DEFAULT_RES_CACHE_NUMBER 10000
+#define QWORKER_DEFAULT_SCH_TASK_NUMBER 10000
+
+enum {
+ QW_READY_NOT_RECEIVED = 0,
+ QW_READY_RECEIVED,
+ QW_READY_RESPONSED,
+};
+
+typedef struct SQWorkerTaskStatus {
+ int8_t status;
+ int8_t ready;
+} SQWorkerTaskStatus;
+
+typedef struct SQWorkerResCache {
+ void *data;
+} SQWorkerResCache;
+
+typedef struct SQWorkerSchTaskStatus {
+ int32_t lastAccessTs; // timestamp in second
+ SHashObj *taskStatus; // key:queryId+taskId, value: SQWorkerTaskStatus
+} SQWorkerSchTaskStatus;
+
+// Qnode/Vnode level task management
+typedef struct SQWorkerMgmt {
+ SQWorkerCfg cfg;
+ SHashObj *scheduleHash; //key: schedulerId, value: SQWorkerSchTaskStatus
+ SHashObj *resHash; //key: queryId+taskId, value: SQWorkerResCache
+} SQWorkerMgmt;
+
+#define QW_TASK_DONE(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == status == JOB_TASK_STATUS_CANCELLED)
+#define QW_SET_QTID(id, qid, tid) do { *(uint64_t *)(id) = (qid); *(uint64_t *)((char *)(id) + sizeof(qid)) = (tid); } while (0)
+#define QW_GET_QTID(id, qid, tid) do { (qid) = *(uint64_t *)(id); (tid) = *(uint64_t *)((char *)(id) + sizeof(qid)); } while (0)
+
+#define QW_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
+#define QW_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
+#define QW_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
+#define QW_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /*_TD_QWORKER_INT_H_*/
diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c
new file mode 100644
index 0000000000000000000000000000000000000000..82bfd75b6a1d2ffb22a4c11b6f286b185d6a643a
--- /dev/null
+++ b/source/libs/qworker/src/qworker.c
@@ -0,0 +1,298 @@
+#include "taosmsg.h"
+#include "query.h"
+#include "qworker.h"
+#include "qworkerInt.h"
+#include "planner.h"
+
+int32_t qwAddTaskStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, int8_t taskStatus) {
+ SQWorkerTaskStatus tStatus = {0};
+ tStatus.status = taskStatus;
+
+ char id[sizeof(queryId) + sizeof(taskId)] = {0};
+ QW_SET_QTID(id, queryId, taskId);
+
+ SQWorkerSchTaskStatus *schStatus = taosHashGet(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId));
+ if (NULL == schStatus) {
+ SQWorkerSchTaskStatus newSchStatus = {0};
+ newSchStatus.taskStatus = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
+ if (NULL == newSchStatus.taskStatus) {
+ qError("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum);
+ return TSDB_CODE_QRY_OUT_OF_MEMORY;
+ }
+
+ if (0 != taosHashPut(newSchStatus.taskStatus, id, sizeof(id), &tStatus, sizeof(tStatus))) {
+ qError("taosHashPut schedulerId[%"PRIx64"]queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", schedulerId, queryId, taskId);
+ taosHashCleanup(newSchStatus.taskStatus);
+ return TSDB_CODE_QRY_APP_ERROR;
+ }
+
+ newSchStatus.lastAccessTs = taosGetTimestampSec();
+
+ if (0 != taosHashPut(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId), &newSchStatus, sizeof(newSchStatus))) {
+ qError("taosHashPut schedulerId[%"PRIx64"] to scheduleHash failed", schedulerId);
+ taosHashCleanup(newSchStatus.taskStatus);
+ return TSDB_CODE_QRY_APP_ERROR;
+ }
+
+ return TSDB_CODE_SUCCESS;
+ }
+
+ schStatus->lastAccessTs = taosGetTimestampSec();
+
+ if (0 != taosHashPut(schStatus->taskStatus, id, sizeof(id), &tStatus, sizeof(tStatus))) {
+ qError("taosHashPut schedulerId[%"PRIx64"]queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", schedulerId, queryId, taskId);
+ return TSDB_CODE_QRY_APP_ERROR;
+ }
+
+ return TSDB_CODE_SUCCESS;
+}
+
+int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, SQWorkerTaskStatus **taskStatus) {
+ SQWorkerSchTaskStatus *schStatus = taosHashGet(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId));
+ if (NULL == schStatus) {
+ qError("no scheduler for schedulerId[%"PRIx64"]", schedulerId);
+ return TSDB_CODE_QRY_APP_ERROR;
+ }
+
+ schStatus->lastAccessTs = taosGetTimestampSec();
+
+ char id[sizeof(queryId) + sizeof(taskId)] = {0};
+ QW_SET_QTID(id, queryId, taskId);
+
+ SQWorkerTaskStatus *tStatus = taosHashGet(schStatus->taskStatus, id, sizeof(id));
+ if (NULL == tStatus) {
+ qError("no task status for schedulerId[%"PRIx64"] queryId[%"PRIx64"] taskId[%"PRIx64"]", schedulerId, queryId, taskId);
+ return TSDB_CODE_QRY_APP_ERROR;
+ }
+
+ *taskStatus = tStatus;
+
+ return TSDB_CODE_SUCCESS;
+}
+
+int32_t qwAddTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, void *data) {
+ char id[sizeof(queryId) + sizeof(taskId)] = {0};
+ QW_SET_QTID(id, queryId, taskId);
+
+ SQWorkerResCache resCache = {0};
+ resCache.data = data;
+
+ if (0 != taosHashPut(mgmt->resHash, id, sizeof(id), &resCache, sizeof(SQWorkerResCache))) {
+ qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to resHash failed", queryId, taskId);
+ return TSDB_CODE_QRY_APP_ERROR;
+ }
+
+ return TSDB_CODE_SUCCESS;
+}
+
+
+int32_t qwGetTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, void **data) {
+ char id[sizeof(queryId) + sizeof(taskId)] = {0};
+ QW_SET_QTID(id, queryId, taskId);
+
+ SQWorkerResCache *resCache = taosHashGet(mgmt->resHash, id, sizeof(id));
+ if (NULL == resCache) {
+ qError("no task res for queryId[%"PRIx64"] taskId[%"PRIx64"]", queryId, taskId);
+ return TSDB_CODE_QRY_APP_ERROR;
+ }
+
+ *data = resCache->data;
+
+ return TSDB_CODE_SUCCESS;
+}
+
+int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t schedulerId) {
+ SQWorkerSchTaskStatus *schStatus = taosHashGet(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId));
+ if (NULL == schStatus) {
+ qError("no scheduler for schedulerId[%"PRIx64"]", schedulerId);
+ return TSDB_CODE_QRY_APP_ERROR;
+ }
+
+ schStatus->lastAccessTs = taosGetTimestampSec();
+
+ return TSDB_CODE_SUCCESS;
+}
+
+int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, SSchedulerStatusRsp **rsp) {
+ SQWorkerSchTaskStatus *schStatus = taosHashGet(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId));
+ if (NULL == schStatus) {
+ qError("no scheduler for schedulerId[%"PRIx64"]", schedulerId);
+ return TSDB_CODE_QRY_APP_ERROR;
+ }
+
+ schStatus->lastAccessTs = taosGetTimestampSec();
+
+ int32_t i = 0;
+ int32_t taskNum = taosHashGetSize(schStatus->taskStatus);
+ int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum;
+ *rsp = calloc(1, size);
+ if (NULL == *rsp) {
+ qError("calloc %d failed", size);
+ return TSDB_CODE_QRY_OUT_OF_MEMORY;
+ }
+
+ void *key = NULL;
+ size_t keyLen = 0;
+ void *pIter = taosHashIterate(schStatus->taskStatus, NULL);
+ while (pIter) {
+ SQWorkerTaskStatus *taskStatus = (SQWorkerTaskStatus *)pIter;
+ taosHashGetKey(pIter, &key, &keyLen);
+
+ QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId);
+ (*rsp)->status[i].status = taskStatus->status;
+
+ pIter = taosHashIterate(schStatus->taskStatus, pIter);
+ }
+
+ (*rsp)->num = taskNum;
+
+ return TSDB_CODE_SUCCESS;
+}
+
+int32_t qwBuildRspMsg(void *data, int32_t msgType);
+
+
+int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) {
+ SQWorkerMgmt *mgmt = calloc(1, sizeof(SQWorkerMgmt));
+ if (NULL == mgmt) {
+ qError("calloc %d failed", (int32_t)sizeof(SQWorkerMgmt));
+ return TSDB_CODE_QRY_OUT_OF_MEMORY;
+ }
+
+ if (cfg) {
+ mgmt->cfg = *cfg;
+ } else {
+ mgmt->cfg.maxSchedulerNum = QWORKER_DEFAULT_SCHEDULER_NUMBER;
+ mgmt->cfg.maxResCacheNum = QWORKER_DEFAULT_RES_CACHE_NUMBER;
+ mgmt->cfg.maxSchTaskNum = QWORKER_DEFAULT_SCH_TASK_NUMBER;
+ }
+
+ mgmt->scheduleHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
+ if (NULL == mgmt->scheduleHash) {
+ tfree(mgmt);
+ QW_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler hash failed", mgmt->cfg.maxSchedulerNum);
+ }
+
+ mgmt->resHash = taosHashInit(mgmt->cfg.maxResCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
+ if (NULL == mgmt->resHash) {
+ taosHashCleanup(mgmt->scheduleHash);
+ mgmt->scheduleHash = NULL;
+ tfree(mgmt);
+
+ QW_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d res cache hash failed", mgmt->cfg.maxResCacheNum);
+ }
+
+ *qWorkerMgmt = mgmt;
+
+ return TSDB_CODE_SUCCESS;
+}
+
+int32_t qWorkerProcessQueryMsg(void *qWorkerMgmt, SSchedulerQueryMsg *msg, SRpcMsg *rsp) {
+ if (NULL == qWorkerMgmt || NULL == msg || NULL == rsp) {
+ return TSDB_CODE_QRY_INVALID_INPUT;
+ }
+
+ SSubplan *plan = NULL;
+ SQWorkerTaskStatus *tStatus = NULL;
+
+ int32_t code = qStringToSubplan(msg->msg, &plan);
+ if (TSDB_CODE_SUCCESS != code) {
+ qError("schId:%"PRIx64",qId:%"PRIx64",taskId:%"PRIx64" string to subplan failed, code:%d", msg->schedulerId, msg->queryId, msg->taskId, code);
+ return code;
+ }
+
+ //TODO call executer to init subquery
+
+ QW_ERR_JRET(qwAddTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, JOB_TASK_STATUS_EXECUTING));
+
+ QW_ERR_JRET(qwBuildRspMsg(NULL, TSDB_MSG_TYPE_QUERY_RSP));
+
+ //TODO call executer to execute subquery
+ code = 0;
+ void *data = NULL;
+ //TODO call executer to execute subquery
+
+ QW_ERR_JRET(qwGetTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, &tStatus));
+
+ tStatus->status = (code) ? JOB_TASK_STATUS_FAILED : JOB_TASK_STATUS_SUCCEED;
+
+ QW_ERR_JRET(qwAddTaskResult(qWorkerMgmt, msg->queryId, msg->taskId, data));
+
+_return:
+
+ if (tStatus && QW_TASK_DONE(tStatus->status) && QW_READY_RECEIVED == tStatus->ready) {
+ QW_ERR_RET(qwBuildRspMsg(NULL, TSDB_MSG_TYPE_RES_READY_RSP));
+ }
+
+ qDestroySubplan(plan);
+
+ return code;
+}
+
+int32_t qWorkerProcessReadyMsg(void *qWorkerMgmt, SSchedulerReadyMsg *msg, SRpcMsg *rsp){
+ if (NULL == qWorkerMgmt || NULL == msg || NULL == rsp) {
+ return TSDB_CODE_QRY_INVALID_INPUT;
+ }
+
+ SQWorkerTaskStatus *tStatus = NULL;
+
+ QW_ERR_RET(qwGetTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, &tStatus));
+
+ if (QW_TASK_DONE(tStatus->status)) {
+ QW_ERR_RET(qwBuildRspMsg(tStatus, TSDB_MSG_TYPE_RES_READY_RSP));
+ } else {
+ tStatus->ready = QW_READY_RECEIVED;
+
+ return TSDB_CODE_SUCCESS;
+ }
+
+ tStatus->ready = QW_READY_RESPONSED;
+
+ return TSDB_CODE_SUCCESS;
+}
+
+int32_t qWorkerProcessStatusMsg(void *qWorkerMgmt, SSchedulerStatusMsg *msg, SRpcMsg *rsp) {
+ if (NULL == qWorkerMgmt || NULL == msg || NULL == rsp) {
+ return TSDB_CODE_QRY_INVALID_INPUT;
+ }
+
+ SSchedulerStatusRsp *sStatus = NULL;
+
+ QW_ERR_RET(qwGetSchTasksStatus(qWorkerMgmt, msg->schedulerId, &sStatus));
+
+ return TSDB_CODE_SUCCESS;
+}
+
+int32_t qWorkerProcessFetchMsg(void *qWorkerMgmt, SSchedulerFetchMsg *msg, SRpcMsg *rsp) {
+ if (NULL == qWorkerMgmt || NULL == msg || NULL == rsp) {
+ return TSDB_CODE_QRY_INVALID_INPUT;
+ }
+
+ QW_ERR_RET(qwUpdateSchLastAccess(qWorkerMgmt, msg->schedulerId));
+
+ void *data = NULL;
+
+ QW_ERR_RET(qwGetTaskResult(qWorkerMgmt, msg->queryId, msg->taskId, &data));
+
+ QW_ERR_RET(qwBuildRspMsg(data, TSDB_MSG_TYPE_FETCH_RSP));
+
+ return TSDB_CODE_SUCCESS;
+}
+
+int32_t qWorkerProcessCancelMsg(void *qWorkerMgmt, SSchedulerCancelMsg *msg, SRpcMsg *rsp);
+
+void qWorkerDestroy(void **qWorkerMgmt) {
+ if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
+ return;
+ }
+
+ SQWorkerMgmt *mgmt = *qWorkerMgmt;
+
+ //TODO STOP ALL QUERY
+
+ //TODO FREE ALL
+
+ tfree(*qWorkerMgmt);
+}
+
+
diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h
index b200df3499cbd3ed121c93d4eee675938b59780e..0234472c706461be1c57ea277a351142abb6a558 100644
--- a/source/libs/scheduler/inc/schedulerInt.h
+++ b/source/libs/scheduler/inc/schedulerInt.h
@@ -31,15 +31,6 @@ extern "C" {
#define SCH_MAX_CONDIDATE_EP_NUM TSDB_MAX_REPLICA
-enum {
- SCH_STATUS_NOT_START = 1,
- SCH_STATUS_EXECUTING,
- SCH_STATUS_SUCCEED,
- SCH_STATUS_FAILED,
- SCH_STATUS_CANCELLING,
- SCH_STATUS_CANCELLED
-};
-
typedef struct SSchedulerMgmt {
uint64_t taskId;
uint64_t schedulerId;
diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c
index 147bd91a22b0069b4276eb92c9638617e03086b8..efbb565578f208cb256960498b44fa1dcdd31d01 100644
--- a/source/libs/scheduler/src/scheduler.c
+++ b/source/libs/scheduler/src/scheduler.c
@@ -161,7 +161,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
SArray *levelPlans = NULL;
int32_t levelPlanNum = 0;
- level.status = SCH_STATUS_NOT_START;
+ level.status = JOB_TASK_STATUS_NOT_START;
for (int32_t i = 0; i < levelNum; ++i) {
level.level = i;
@@ -191,7 +191,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
task->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
task->plan = plan;
- task->status = SCH_STATUS_NOT_START;
+ task->status = JOB_TASK_STATUS_NOT_START;
if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &task, POINTER_BYTES)) {
qError("taosHashPut failed");
@@ -292,14 +292,14 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
SSchedulerQueryMsg *pMsg = msg;
- pMsg->schedulerId = schMgmt.schedulerId;
- pMsg->queryId = job->queryId;
- pMsg->taskId = task->taskId;
- pMsg->contentLen = len;
+ pMsg->schedulerId = htobe64(schMgmt.schedulerId);
+ pMsg->queryId = htobe64(job->queryId);
+ pMsg->taskId = htobe64(task->taskId);
+ pMsg->contentLen = htonl(len);
memcpy(pMsg->msg, task->msg, len);
break;
}
- case TSDB_MSG_TYPE_RSP_READY: {
+ case TSDB_MSG_TYPE_RES_READY: {
msgSize = sizeof(SSchedulerReadyMsg);
msg = calloc(1, msgSize);
if (NULL == msg) {
@@ -308,8 +308,8 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
}
SSchedulerReadyMsg *pMsg = msg;
- pMsg->queryId = job->queryId;
- pMsg->taskId = task->taskId;
+ pMsg->queryId = htobe64(job->queryId);
+ pMsg->taskId = htobe64(task->taskId);
break;
}
case TSDB_MSG_TYPE_FETCH: {
@@ -321,8 +321,8 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
}
SSchedulerFetchMsg *pMsg = msg;
- pMsg->queryId = job->queryId;
- pMsg->taskId = task->taskId;
+ pMsg->queryId = htobe64(job->queryId);
+ pMsg->taskId = htobe64(task->taskId);
break;
}
default:
@@ -365,7 +365,7 @@ _return:
int32_t schProcessOnJobSuccess(SQueryJob *job) {
- job->status = SCH_STATUS_SUCCEED;
+ job->status = JOB_TASK_STATUS_SUCCEED;
if (job->userFetch) {
SCH_ERR_RET(schFetchFromRemote(job));
@@ -375,7 +375,7 @@ int32_t schProcessOnJobSuccess(SQueryJob *job) {
}
int32_t schProcessOnJobFailure(SQueryJob *job) {
- job->status = SCH_STATUS_FAILED;
+ job->status = JOB_TASK_STATUS_FAILED;
atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
@@ -402,7 +402,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
return TSDB_CODE_SUCCESS;
}
- task->status = SCH_STATUS_SUCCEED;
+ task->status = JOB_TASK_STATUS_SUCCEED;
int32_t parentNum = (int32_t)taosArrayGetSize(task->parents);
if (parentNum == 0) {
@@ -448,7 +448,7 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod
if (!needRetry) {
SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode);
- job->status = SCH_STATUS_FAILED;
+ job->status = JOB_TASK_STATUS_FAILED;
SCH_ERR_RET(schProcessOnJobFailure(job));
return TSDB_CODE_SUCCESS;
@@ -467,13 +467,13 @@ int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32
if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
} else {
- code = schAsyncSendMsg(job, task, TSDB_MSG_TYPE_RSP_READY);
+ code = schAsyncSendMsg(job, task, TSDB_MSG_TYPE_RES_READY);
if (code) {
goto _task_error;
}
}
break;
- case TSDB_MSG_TYPE_RSP_READY:
+ case TSDB_MSG_TYPE_RES_READY:
if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
} else {
@@ -518,7 +518,7 @@ int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) {
SCH_ERR_RET(schPushTaskToExecList(job, task));
- task->status = SCH_STATUS_EXECUTING;
+ task->status = JOB_TASK_STATUS_EXECUTING;
return TSDB_CODE_SUCCESS;
}
@@ -530,7 +530,7 @@ int32_t schLaunchJob(SQueryJob *job) {
SCH_ERR_RET(schLaunchTask(job, task));
}
- job->status = SCH_STATUS_EXECUTING;
+ job->status = JOB_TASK_STATUS_EXECUTING;
return TSDB_CODE_SUCCESS;
}
@@ -590,7 +590,7 @@ int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pM
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
- job->status = SCH_STATUS_NOT_START;
+ job->status = JOB_TASK_STATUS_NOT_START;
SCH_ERR_JRET(schLaunchJob(job));
@@ -619,7 +619,7 @@ int32_t scheduleFetchRows(void *pJob, void **data) {
return TSDB_CODE_QRY_APP_ERROR;
}
- if (job->status == SCH_STATUS_SUCCEED) {
+ if (job->status == JOB_TASK_STATUS_SUCCEED) {
SCH_ERR_JRET(schFetchFromRemote(job));
}
@@ -653,7 +653,7 @@ void scheduleFreeJob(void *pJob) {
return;
}
- if (job->status == SCH_STATUS_EXECUTING) {
+ if (job->status == JOB_TASK_STATUS_EXECUTING) {
scheduleCancelJob(pJob);
}
}
diff --git a/source/util/src/thash.c b/source/util/src/thash.c
index bfc6b47fa6abd80245b021954ee908156e9eebeb..c4f6f78106e1ee6703e0429e705f62ab83db1881 100644
--- a/source/util/src/thash.c
+++ b/source/util/src/thash.c
@@ -776,9 +776,16 @@ size_t taosHashGetMemSize(const SHashObj *pHashObj) {
return (pHashObj->capacity * (sizeof(SHashEntry) + POINTER_BYTES)) + sizeof(SHashNode) * taosHashGetSize(pHashObj) + sizeof(SHashObj);
}
-FORCE_INLINE void *taosHashGetDataKey(SHashObj *pHashObj, void *data) {
+FORCE_INLINE int32_t taosHashGetKey(void *data, void** key, size_t* keyLen) {
+ if (NULL == data || NULL == key) {
+ return -1;
+ }
+
SHashNode * node = GET_HASH_PNODE(data);
- return GET_HASH_NODE_KEY(node);
+ *key = GET_HASH_NODE_KEY(node);
+ *keyLen = node->keyLen;
+
+ return 0;
}
FORCE_INLINE uint32_t taosHashGetDataKeyLen(SHashObj *pHashObj, void *data) {