diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 11a38a3742e049d11a8f06bca7f429d896c91e37..ee4af6041ea0c4f6a0f5ecc800c1edd798c84cb9 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -1082,6 +1082,7 @@ typedef struct { } SUpdateTagValRsp; typedef struct SSchedulerQueryMsg { + uint64_t schedulerId; uint64_t queryId; uint64_t taskId; uint32_t contentLen; @@ -1098,6 +1099,10 @@ typedef struct SSchedulerFetchMsg { uint64_t taskId; } SSchedulerFetchMsg; +typedef struct SSchedulerStatusMsg { + uint64_t schedulerId; +} SSchedulerStatusMsg; + #pragma pack(pop) diff --git a/include/dnode/qnode/qnode.h b/include/dnode/qnode/qnode.h index 65779b209938bc9766f2d0d11881c2354d0c5092..aa4c3af392dfb2e94198d0062801fe405624752c 100644 --- a/include/dnode/qnode/qnode.h +++ b/include/dnode/qnode/qnode.h @@ -19,7 +19,7 @@ #ifdef __cplusplus extern "C" { #endif - +#include "trpc.h" typedef struct { uint64_t numOfStartTask; @@ -32,48 +32,6 @@ typedef struct { uint64_t numOfErrors; } SQnodeStat; -/* start Task msg */ -typedef struct { - uint32_t schedulerIp; - uint16_t schedulerPort; - int64_t taskId; - int64_t queryId; - uint32_t srcIp; - uint16_t srcPort; -} SQnodeStartTaskMsg; - -/* stop Task msg */ -typedef struct { - int64_t taskId; -} SQnodeStopTaskMsg; - -/* start/stop Task msg response */ -typedef struct { - int64_t taskId; - int32_t code; -} SQnodeTaskRespMsg; - -/* Task status msg */ -typedef struct { - int64_t taskId; - int32_t status; - int64_t queryId; -} SQnodeTaskStatusMsg; - -/* Qnode/Scheduler heartbeat msg */ -typedef struct { - int32_t status; - int32_t load; - -} SQnodeHeartbeatMsg; - -/* Qnode sent/received msg */ -typedef struct { - int8_t msgType; - int32_t msgLen; - char msg[]; -} SQnodeMsg; - /** * Start one Qnode in Dnode. diff --git a/include/libs/qcom/queryWorker.h b/include/libs/qcom/queryWorker.h new file mode 100644 index 0000000000000000000000000000000000000000..9a3d63ae65c8a9410f69d10ccba024f17a924477 --- /dev/null +++ b/include/libs/qcom/queryWorker.h @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_QUERY_WORKER_H_ +#define _TD_QUERY_WORKER_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct SQWorkerCfg { + uint32_t maxSchedulerNum; + uint32_t maxResCacheNum; +} SQWorkerCfg; + +typedef struct { + uint64_t numOfStartTask; + uint64_t numOfStopTask; + uint64_t numOfRecvedFetch; + uint64_t numOfSentHb; + uint64_t numOfSentFetch; + uint64_t numOfTaskInQueue; + uint64_t numOfFetchInQueue; + uint64_t numOfErrors; +} SQWorkerStat; + + +int32_t qWorkerInit(SQWorkerCfg *cfg); + +int32_t qWorkerProcessQueryMsg(char *msg, int32_t msgLen, int32_t *code, char **rspMsg); + + + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_QUERY_WORKER_H_*/ diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index c01b79d7ff7dfe3df4b614dfac742636bf3346a6..180f78a0852240f01c3326f8743ddf7e209846b5 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -25,6 +25,7 @@ extern "C" { typedef struct SSchedulerCfg { int32_t clusterType; + int32_t maxJobNum; } SSchedulerCfg; typedef struct SQueryProfileSummary { diff --git a/source/libs/qcom/inc/qWorkerInt.h b/source/libs/qcom/inc/qWorkerInt.h new file mode 100644 index 0000000000000000000000000000000000000000..ce87e18c6aabf1dd64a52149a9860eb7e83dcf84 --- /dev/null +++ b/source/libs/qcom/inc/qWorkerInt.h @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_QWORKER_INT_H_ +#define _TD_QWORKER_INT_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#define QWORKER_DEFAULT_SCHEDULER_NUMBER 10000 +#define QWORKER_DEFAULT_RES_CACHE_NUMBER 10000 + +typedef struct SQWorkerMgmt { + SQWorkerCfg cfg; + SHashObj *scheduleHash; //key: schedulerId, value: + SHashObj *resHash; //key: queryId+taskId, value: +} SQWorkerMgmt; + + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_QWORKER_INT_H_*/ diff --git a/source/libs/qcom/src/queryWorker.c b/source/libs/qcom/src/queryWorker.c new file mode 100644 index 0000000000000000000000000000000000000000..580a340834e403a74bce6a4da26bc42c2a73b8e2 --- /dev/null +++ b/source/libs/qcom/src/queryWorker.c @@ -0,0 +1,38 @@ +#include "queryInt.h" +#include "query.h" +#include "taosmsg.h" +#include "queryWorker.h" +#include "qWorkerInt.h" + +SQWorkerMgmt qWorkerMgmt = {0}; + +int32_t qWorkerInit(SQWorkerCfg *cfg) { + if (cfg) { + qWorkerMgmt.cfg = *cfg; + } else { + qWorkerMgmt.cfg.maxSchedulerNum = QWORKER_DEFAULT_SCHEDULER_NUMBER; + qWorkerMgmt.cfg.maxResCacheNum = QWORKER_DEFAULT_RES_CACHE_NUMBER; + } + + qWorkerMgmt.scheduleHash = taosHashInit(qWorkerMgmt.cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == qWorkerMgmt.scheduleHash) { + SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler hash failed", qWorkerMgmt.cfg.maxSchedulerNum); + } + + qWorkerMgmt.resHash = taosHashInit(qWorkerMgmt.cfg.maxResCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (NULL == qWorkerMgmt.resHash) { + taosHashCleanup(qWorkerMgmt.scheduleHash); + qWorkerMgmt.scheduleHash = NULL; + + SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d res cache hash failed", qWorkerMgmt.cfg.maxResCacheNum); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t qWorkerProcessQueryMsg(SSchedulerQueryMsg *msg, int32_t msgLen, int32_t *code, char **rspMsg) { + + // TODO + return 0; +} + diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index db75fc4fdd49336b818b9b57cedf3d0f772817c0..b200df3499cbd3ed121c93d4eee675938b59780e 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -41,7 +41,8 @@ enum { }; typedef struct SSchedulerMgmt { - uint64_t taskId; + uint64_t taskId; + uint64_t schedulerId; SSchedulerCfg cfg; SHashObj *Jobs; // key: queryId, value: SQueryJob* } SSchedulerMgmt; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 2327fc5b0464bf07add7e3086a362d650cdb8ed6..147bd91a22b0069b4276eb92c9638617e03086b8 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -291,6 +291,8 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { } SSchedulerQueryMsg *pMsg = msg; + + pMsg->schedulerId = schMgmt.schedulerId; pMsg->queryId = job->queryId; pMsg->taskId = task->taskId; pMsg->contentLen = len; @@ -535,15 +537,19 @@ int32_t schLaunchJob(SQueryJob *job) { int32_t schedulerInit(SSchedulerCfg *cfg) { - schMgmt.Jobs = taosHashInit(SCHEDULE_DEFAULT_JOB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); - if (NULL == schMgmt.Jobs) { - SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", SCHEDULE_DEFAULT_JOB_NUMBER); - } - if (cfg) { schMgmt.cfg = *cfg; + } else { + schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER; } + schMgmt.Jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == schMgmt.Jobs) { + SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum); + } + + schMgmt.schedulerId = 1; //TODO GENERATE A UUID + return TSDB_CODE_SUCCESS; }