提交 48cc2702 编写于 作者: D dapan1121

feature/qnode

上级 6377bab3
...@@ -1082,6 +1082,7 @@ typedef struct { ...@@ -1082,6 +1082,7 @@ typedef struct {
} SUpdateTagValRsp; } SUpdateTagValRsp;
typedef struct SSchedulerQueryMsg { typedef struct SSchedulerQueryMsg {
uint64_t schedulerId;
uint64_t queryId; uint64_t queryId;
uint64_t taskId; uint64_t taskId;
uint32_t contentLen; uint32_t contentLen;
...@@ -1098,6 +1099,10 @@ typedef struct SSchedulerFetchMsg { ...@@ -1098,6 +1099,10 @@ typedef struct SSchedulerFetchMsg {
uint64_t taskId; uint64_t taskId;
} SSchedulerFetchMsg; } SSchedulerFetchMsg;
typedef struct SSchedulerStatusMsg {
uint64_t schedulerId;
} SSchedulerStatusMsg;
#pragma pack(pop) #pragma pack(pop)
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "trpc.h"
typedef struct { typedef struct {
uint64_t numOfStartTask; uint64_t numOfStartTask;
...@@ -32,48 +32,6 @@ typedef struct { ...@@ -32,48 +32,6 @@ typedef struct {
uint64_t numOfErrors; uint64_t numOfErrors;
} SQnodeStat; } SQnodeStat;
/* start Task msg */
typedef struct {
uint32_t schedulerIp;
uint16_t schedulerPort;
int64_t taskId;
int64_t queryId;
uint32_t srcIp;
uint16_t srcPort;
} SQnodeStartTaskMsg;
/* stop Task msg */
typedef struct {
int64_t taskId;
} SQnodeStopTaskMsg;
/* start/stop Task msg response */
typedef struct {
int64_t taskId;
int32_t code;
} SQnodeTaskRespMsg;
/* Task status msg */
typedef struct {
int64_t taskId;
int32_t status;
int64_t queryId;
} SQnodeTaskStatusMsg;
/* Qnode/Scheduler heartbeat msg */
typedef struct {
int32_t status;
int32_t load;
} SQnodeHeartbeatMsg;
/* Qnode sent/received msg */
typedef struct {
int8_t msgType;
int32_t msgLen;
char msg[];
} SQnodeMsg;
/** /**
* Start one Qnode in Dnode. * Start one Qnode in Dnode.
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_QUERY_WORKER_H_
#define _TD_QUERY_WORKER_H_
#ifdef __cplusplus
extern "C" {
#endif
typedef struct SQWorkerCfg {
uint32_t maxSchedulerNum;
uint32_t maxResCacheNum;
} SQWorkerCfg;
typedef struct {
uint64_t numOfStartTask;
uint64_t numOfStopTask;
uint64_t numOfRecvedFetch;
uint64_t numOfSentHb;
uint64_t numOfSentFetch;
uint64_t numOfTaskInQueue;
uint64_t numOfFetchInQueue;
uint64_t numOfErrors;
} SQWorkerStat;
int32_t qWorkerInit(SQWorkerCfg *cfg);
int32_t qWorkerProcessQueryMsg(char *msg, int32_t msgLen, int32_t *code, char **rspMsg);
#ifdef __cplusplus
}
#endif
#endif /*_TD_QUERY_WORKER_H_*/
...@@ -25,6 +25,7 @@ extern "C" { ...@@ -25,6 +25,7 @@ extern "C" {
typedef struct SSchedulerCfg { typedef struct SSchedulerCfg {
int32_t clusterType; int32_t clusterType;
int32_t maxJobNum;
} SSchedulerCfg; } SSchedulerCfg;
typedef struct SQueryProfileSummary { typedef struct SQueryProfileSummary {
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_QWORKER_INT_H_
#define _TD_QWORKER_INT_H_
#ifdef __cplusplus
extern "C" {
#endif
#define QWORKER_DEFAULT_SCHEDULER_NUMBER 10000
#define QWORKER_DEFAULT_RES_CACHE_NUMBER 10000
typedef struct SQWorkerMgmt {
SQWorkerCfg cfg;
SHashObj *scheduleHash; //key: schedulerId, value:
SHashObj *resHash; //key: queryId+taskId, value:
} SQWorkerMgmt;
#ifdef __cplusplus
}
#endif
#endif /*_TD_QWORKER_INT_H_*/
#include "queryInt.h"
#include "query.h"
#include "taosmsg.h"
#include "queryWorker.h"
#include "qWorkerInt.h"
SQWorkerMgmt qWorkerMgmt = {0};
int32_t qWorkerInit(SQWorkerCfg *cfg) {
if (cfg) {
qWorkerMgmt.cfg = *cfg;
} else {
qWorkerMgmt.cfg.maxSchedulerNum = QWORKER_DEFAULT_SCHEDULER_NUMBER;
qWorkerMgmt.cfg.maxResCacheNum = QWORKER_DEFAULT_RES_CACHE_NUMBER;
}
qWorkerMgmt.scheduleHash = taosHashInit(qWorkerMgmt.cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == qWorkerMgmt.scheduleHash) {
SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler hash failed", qWorkerMgmt.cfg.maxSchedulerNum);
}
qWorkerMgmt.resHash = taosHashInit(qWorkerMgmt.cfg.maxResCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == qWorkerMgmt.resHash) {
taosHashCleanup(qWorkerMgmt.scheduleHash);
qWorkerMgmt.scheduleHash = NULL;
SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d res cache hash failed", qWorkerMgmt.cfg.maxResCacheNum);
}
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessQueryMsg(SSchedulerQueryMsg *msg, int32_t msgLen, int32_t *code, char **rspMsg) {
// TODO
return 0;
}
...@@ -41,7 +41,8 @@ enum { ...@@ -41,7 +41,8 @@ enum {
}; };
typedef struct SSchedulerMgmt { typedef struct SSchedulerMgmt {
uint64_t taskId; uint64_t taskId;
uint64_t schedulerId;
SSchedulerCfg cfg; SSchedulerCfg cfg;
SHashObj *Jobs; // key: queryId, value: SQueryJob* SHashObj *Jobs; // key: queryId, value: SQueryJob*
} SSchedulerMgmt; } SSchedulerMgmt;
......
...@@ -291,6 +291,8 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { ...@@ -291,6 +291,8 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
} }
SSchedulerQueryMsg *pMsg = msg; SSchedulerQueryMsg *pMsg = msg;
pMsg->schedulerId = schMgmt.schedulerId;
pMsg->queryId = job->queryId; pMsg->queryId = job->queryId;
pMsg->taskId = task->taskId; pMsg->taskId = task->taskId;
pMsg->contentLen = len; pMsg->contentLen = len;
...@@ -535,15 +537,19 @@ int32_t schLaunchJob(SQueryJob *job) { ...@@ -535,15 +537,19 @@ int32_t schLaunchJob(SQueryJob *job) {
int32_t schedulerInit(SSchedulerCfg *cfg) { int32_t schedulerInit(SSchedulerCfg *cfg) {
schMgmt.Jobs = taosHashInit(SCHEDULE_DEFAULT_JOB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == schMgmt.Jobs) {
SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", SCHEDULE_DEFAULT_JOB_NUMBER);
}
if (cfg) { if (cfg) {
schMgmt.cfg = *cfg; schMgmt.cfg = *cfg;
} else {
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
} }
schMgmt.Jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == schMgmt.Jobs) {
SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum);
}
schMgmt.schedulerId = 1; //TODO GENERATE A UUID
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册