未验证 提交 a630d82c 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9262 from taosdata/feature/qnode

Feature/qnode
...@@ -51,7 +51,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" ) ...@@ -51,7 +51,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_SET_CUR, "mq-set-cur" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_SET_CUR, "mq-set-cur" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_RSP_READY, "rsp-ready" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_RES_READY, "res-ready" )
// message from client to mnode // message from client to mnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CONNECT, "connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CREATE_ACCT, "create-acct" )
...@@ -1094,6 +1094,7 @@ typedef struct { ...@@ -1094,6 +1094,7 @@ typedef struct {
} SUpdateTagValRsp; } SUpdateTagValRsp;
typedef struct SSchedulerQueryMsg { typedef struct SSchedulerQueryMsg {
uint64_t schedulerId;
uint64_t queryId; uint64_t queryId;
uint64_t taskId; uint64_t taskId;
uint32_t contentLen; uint32_t contentLen;
...@@ -1101,15 +1102,39 @@ typedef struct SSchedulerQueryMsg { ...@@ -1101,15 +1102,39 @@ typedef struct SSchedulerQueryMsg {
} SSchedulerQueryMsg; } SSchedulerQueryMsg;
typedef struct SSchedulerReadyMsg { typedef struct SSchedulerReadyMsg {
uint64_t schedulerId;
uint64_t queryId; uint64_t queryId;
uint64_t taskId; uint64_t taskId;
} SSchedulerReadyMsg; } SSchedulerReadyMsg;
typedef struct SSchedulerFetchMsg { typedef struct SSchedulerFetchMsg {
uint64_t schedulerId;
uint64_t queryId; uint64_t queryId;
uint64_t taskId; uint64_t taskId;
} SSchedulerFetchMsg; } SSchedulerFetchMsg;
typedef struct SSchedulerStatusMsg {
uint64_t schedulerId;
} SSchedulerStatusMsg;
typedef struct STaskStatus {
uint64_t queryId;
uint64_t taskId;
int8_t status;
} STaskStatus;
typedef struct SSchedulerStatusRsp {
uint32_t num;
STaskStatus status[];
} SSchedulerStatusRsp;
typedef struct SSchedulerCancelMsg {
uint64_t schedulerId;
uint64_t queryId;
uint64_t taskId;
} SSchedulerCancelMsg;
#pragma pack(pop) #pragma pack(pop)
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "trpc.h"
typedef struct { typedef struct {
uint64_t numOfStartTask; uint64_t numOfStartTask;
...@@ -32,48 +32,6 @@ typedef struct { ...@@ -32,48 +32,6 @@ typedef struct {
uint64_t numOfErrors; uint64_t numOfErrors;
} SQnodeStat; } SQnodeStat;
/* start Task msg */
typedef struct {
uint32_t schedulerIp;
uint16_t schedulerPort;
int64_t taskId;
int64_t queryId;
uint32_t srcIp;
uint16_t srcPort;
} SQnodeStartTaskMsg;
/* stop Task msg */
typedef struct {
int64_t taskId;
} SQnodeStopTaskMsg;
/* start/stop Task msg response */
typedef struct {
int64_t taskId;
int32_t code;
} SQnodeTaskRespMsg;
/* Task status msg */
typedef struct {
int64_t taskId;
int32_t status;
int64_t queryId;
} SQnodeTaskStatusMsg;
/* Qnode/Scheduler heartbeat msg */
typedef struct {
int32_t status;
int32_t load;
} SQnodeHeartbeatMsg;
/* Qnode sent/received msg */
typedef struct {
int8_t msgType;
int32_t msgLen;
char msg[];
} SQnodeMsg;
/** /**
* Start one Qnode in Dnode. * Start one Qnode in Dnode.
......
...@@ -24,6 +24,15 @@ extern "C" { ...@@ -24,6 +24,15 @@ extern "C" {
#include "thash.h" #include "thash.h"
#include "tlog.h" #include "tlog.h"
enum {
JOB_TASK_STATUS_NOT_START = 1,
JOB_TASK_STATUS_EXECUTING,
JOB_TASK_STATUS_SUCCEED,
JOB_TASK_STATUS_FAILED,
JOB_TASK_STATUS_CANCELLING,
JOB_TASK_STATUS_CANCELLED
};
typedef struct STableComInfo { typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema uint8_t numOfTags; // the number of tags in schema
uint8_t precision; // the number of precision uint8_t precision; // the number of precision
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_QWORKER_H_
#define _TD_QWORKER_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "trpc.h"
typedef struct SQWorkerCfg {
uint32_t maxSchedulerNum;
uint32_t maxResCacheNum;
uint32_t maxSchTaskNum;
} SQWorkerCfg;
typedef struct {
uint64_t numOfStartTask;
uint64_t numOfStopTask;
uint64_t numOfRecvedFetch;
uint64_t numOfSentHb;
uint64_t numOfSentFetch;
uint64_t numOfTaskInQueue;
uint64_t numOfFetchInQueue;
uint64_t numOfErrors;
} SQWorkerStat;
int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt);
int32_t qWorkerProcessQueryMsg(void *qWorkerMgmt, SSchedulerQueryMsg *msg, SRpcMsg *rsp);
int32_t qWorkerProcessReadyMsg(void *qWorkerMgmt, SSchedulerReadyMsg *msg, SRpcMsg *rsp);
int32_t qWorkerProcessStatusMsg(void *qWorkerMgmt, SSchedulerStatusMsg *msg, SRpcMsg *rsp);
int32_t qWorkerProcessFetchMsg(void *qWorkerMgmt, SSchedulerFetchMsg *msg, SRpcMsg *rsp);
int32_t qWorkerProcessCancelMsg(void *qWorkerMgmt, SSchedulerCancelMsg *msg, SRpcMsg *rsp);
void qWorkerDestroy(void **qWorkerMgmt);
#ifdef __cplusplus
}
#endif
#endif /*_TD_QWORKER_H_*/
...@@ -25,6 +25,7 @@ extern "C" { ...@@ -25,6 +25,7 @@ extern "C" {
typedef struct SSchedulerCfg { typedef struct SSchedulerCfg {
int32_t clusterType; int32_t clusterType;
int32_t maxJobNum;
} SSchedulerCfg; } SSchedulerCfg;
typedef struct SQueryProfileSummary { typedef struct SQueryProfileSummary {
......
...@@ -193,11 +193,10 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p); ...@@ -193,11 +193,10 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p);
/** /**
* Get the corresponding key information for a given data in hash table * Get the corresponding key information for a given data in hash table
* @param pHashObj
* @param data * @param data
* @return * @return
*/ */
int32_t taosHashGetKey(SHashObj *pHashObj, void *data, void** key, size_t* keyLen); int32_t taosHashGetKey(void *data, void** key, size_t* keyLen);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -11,3 +11,4 @@ add_subdirectory(executor) ...@@ -11,3 +11,4 @@ add_subdirectory(executor)
add_subdirectory(planner) add_subdirectory(planner)
add_subdirectory(function) add_subdirectory(function)
add_subdirectory(qcom) add_subdirectory(qcom)
add_subdirectory(qworker)
...@@ -1175,7 +1175,7 @@ int32_t filterAddGroupUnitFromNode(SFilterInfo *info, tExprNode* tree, SArray *g ...@@ -1175,7 +1175,7 @@ int32_t filterAddGroupUnitFromNode(SFilterInfo *info, tExprNode* tree, SArray *g
void* key = NULL; void* key = NULL;
len = 0; len = 0;
taosHashGetKey((SHashObj *)data, p, &key, &len); taosHashGetKey(p, &key, &len);
void *fdata = NULL; void *fdata = NULL;
if (IS_VAR_DATA_TYPE(type)) { if (IS_VAR_DATA_TYPE(type)) {
......
aux_source_directory(src QWORKER_SRC)
add_library(qworker ${QWORKER_SRC})
target_include_directories(
qworker
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/qworker"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
qworker
PRIVATE os util transport planner qcom
)
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_QWORKER_INT_H_
#define _TD_QWORKER_INT_H_
#ifdef __cplusplus
extern "C" {
#endif
#define QWORKER_DEFAULT_SCHEDULER_NUMBER 10000
#define QWORKER_DEFAULT_RES_CACHE_NUMBER 10000
#define QWORKER_DEFAULT_SCH_TASK_NUMBER 10000
enum {
QW_READY_NOT_RECEIVED = 0,
QW_READY_RECEIVED,
QW_READY_RESPONSED,
};
typedef struct SQWorkerTaskStatus {
int8_t status;
int8_t ready;
} SQWorkerTaskStatus;
typedef struct SQWorkerResCache {
void *data;
} SQWorkerResCache;
typedef struct SQWorkerSchTaskStatus {
int32_t lastAccessTs; // timestamp in second
SHashObj *taskStatus; // key:queryId+taskId, value: SQWorkerTaskStatus
} SQWorkerSchTaskStatus;
// Qnode/Vnode level task management
typedef struct SQWorkerMgmt {
SQWorkerCfg cfg;
SHashObj *scheduleHash; //key: schedulerId, value: SQWorkerSchTaskStatus
SHashObj *resHash; //key: queryId+taskId, value: SQWorkerResCache
} SQWorkerMgmt;
#define QW_TASK_DONE(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == status == JOB_TASK_STATUS_CANCELLED)
#define QW_SET_QTID(id, qid, tid) do { *(uint64_t *)(id) = (qid); *(uint64_t *)((char *)(id) + sizeof(qid)) = (tid); } while (0)
#define QW_GET_QTID(id, qid, tid) do { (qid) = *(uint64_t *)(id); (tid) = *(uint64_t *)((char *)(id) + sizeof(qid)); } while (0)
#define QW_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define QW_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define QW_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define QW_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#ifdef __cplusplus
}
#endif
#endif /*_TD_QWORKER_INT_H_*/
#include "taosmsg.h"
#include "query.h"
#include "qworker.h"
#include "qworkerInt.h"
#include "planner.h"
int32_t qwAddTaskStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, int8_t taskStatus) {
SQWorkerTaskStatus tStatus = {0};
tStatus.status = taskStatus;
char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId);
SQWorkerSchTaskStatus *schStatus = taosHashGet(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId));
if (NULL == schStatus) {
SQWorkerSchTaskStatus newSchStatus = {0};
newSchStatus.taskStatus = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == newSchStatus.taskStatus) {
qError("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
if (0 != taosHashPut(newSchStatus.taskStatus, id, sizeof(id), &tStatus, sizeof(tStatus))) {
qError("taosHashPut schedulerId[%"PRIx64"]queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", schedulerId, queryId, taskId);
taosHashCleanup(newSchStatus.taskStatus);
return TSDB_CODE_QRY_APP_ERROR;
}
newSchStatus.lastAccessTs = taosGetTimestampSec();
if (0 != taosHashPut(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId), &newSchStatus, sizeof(newSchStatus))) {
qError("taosHashPut schedulerId[%"PRIx64"] to scheduleHash failed", schedulerId);
taosHashCleanup(newSchStatus.taskStatus);
return TSDB_CODE_QRY_APP_ERROR;
}
return TSDB_CODE_SUCCESS;
}
schStatus->lastAccessTs = taosGetTimestampSec();
if (0 != taosHashPut(schStatus->taskStatus, id, sizeof(id), &tStatus, sizeof(tStatus))) {
qError("taosHashPut schedulerId[%"PRIx64"]queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", schedulerId, queryId, taskId);
return TSDB_CODE_QRY_APP_ERROR;
}
return TSDB_CODE_SUCCESS;
}
int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, SQWorkerTaskStatus **taskStatus) {
SQWorkerSchTaskStatus *schStatus = taosHashGet(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId));
if (NULL == schStatus) {
qError("no scheduler for schedulerId[%"PRIx64"]", schedulerId);
return TSDB_CODE_QRY_APP_ERROR;
}
schStatus->lastAccessTs = taosGetTimestampSec();
char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId);
SQWorkerTaskStatus *tStatus = taosHashGet(schStatus->taskStatus, id, sizeof(id));
if (NULL == tStatus) {
qError("no task status for schedulerId[%"PRIx64"] queryId[%"PRIx64"] taskId[%"PRIx64"]", schedulerId, queryId, taskId);
return TSDB_CODE_QRY_APP_ERROR;
}
*taskStatus = tStatus;
return TSDB_CODE_SUCCESS;
}
int32_t qwAddTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, void *data) {
char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId);
SQWorkerResCache resCache = {0};
resCache.data = data;
if (0 != taosHashPut(mgmt->resHash, id, sizeof(id), &resCache, sizeof(SQWorkerResCache))) {
qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to resHash failed", queryId, taskId);
return TSDB_CODE_QRY_APP_ERROR;
}
return TSDB_CODE_SUCCESS;
}
int32_t qwGetTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, void **data) {
char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId);
SQWorkerResCache *resCache = taosHashGet(mgmt->resHash, id, sizeof(id));
if (NULL == resCache) {
qError("no task res for queryId[%"PRIx64"] taskId[%"PRIx64"]", queryId, taskId);
return TSDB_CODE_QRY_APP_ERROR;
}
*data = resCache->data;
return TSDB_CODE_SUCCESS;
}
int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t schedulerId) {
SQWorkerSchTaskStatus *schStatus = taosHashGet(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId));
if (NULL == schStatus) {
qError("no scheduler for schedulerId[%"PRIx64"]", schedulerId);
return TSDB_CODE_QRY_APP_ERROR;
}
schStatus->lastAccessTs = taosGetTimestampSec();
return TSDB_CODE_SUCCESS;
}
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, SSchedulerStatusRsp **rsp) {
SQWorkerSchTaskStatus *schStatus = taosHashGet(mgmt->scheduleHash, &schedulerId, sizeof(schedulerId));
if (NULL == schStatus) {
qError("no scheduler for schedulerId[%"PRIx64"]", schedulerId);
return TSDB_CODE_QRY_APP_ERROR;
}
schStatus->lastAccessTs = taosGetTimestampSec();
int32_t i = 0;
int32_t taskNum = taosHashGetSize(schStatus->taskStatus);
int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum;
*rsp = calloc(1, size);
if (NULL == *rsp) {
qError("calloc %d failed", size);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
void *key = NULL;
size_t keyLen = 0;
void *pIter = taosHashIterate(schStatus->taskStatus, NULL);
while (pIter) {
SQWorkerTaskStatus *taskStatus = (SQWorkerTaskStatus *)pIter;
taosHashGetKey(pIter, &key, &keyLen);
QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId);
(*rsp)->status[i].status = taskStatus->status;
pIter = taosHashIterate(schStatus->taskStatus, pIter);
}
(*rsp)->num = taskNum;
return TSDB_CODE_SUCCESS;
}
int32_t qwBuildRspMsg(void *data, int32_t msgType);
int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) {
SQWorkerMgmt *mgmt = calloc(1, sizeof(SQWorkerMgmt));
if (NULL == mgmt) {
qError("calloc %d failed", (int32_t)sizeof(SQWorkerMgmt));
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
if (cfg) {
mgmt->cfg = *cfg;
} else {
mgmt->cfg.maxSchedulerNum = QWORKER_DEFAULT_SCHEDULER_NUMBER;
mgmt->cfg.maxResCacheNum = QWORKER_DEFAULT_RES_CACHE_NUMBER;
mgmt->cfg.maxSchTaskNum = QWORKER_DEFAULT_SCH_TASK_NUMBER;
}
mgmt->scheduleHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == mgmt->scheduleHash) {
tfree(mgmt);
QW_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler hash failed", mgmt->cfg.maxSchedulerNum);
}
mgmt->resHash = taosHashInit(mgmt->cfg.maxResCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == mgmt->resHash) {
taosHashCleanup(mgmt->scheduleHash);
mgmt->scheduleHash = NULL;
tfree(mgmt);
QW_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d res cache hash failed", mgmt->cfg.maxResCacheNum);
}
*qWorkerMgmt = mgmt;
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessQueryMsg(void *qWorkerMgmt, SSchedulerQueryMsg *msg, SRpcMsg *rsp) {
if (NULL == qWorkerMgmt || NULL == msg || NULL == rsp) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
SSubplan *plan = NULL;
SQWorkerTaskStatus *tStatus = NULL;
int32_t code = qStringToSubplan(msg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) {
qError("schId:%"PRIx64",qId:%"PRIx64",taskId:%"PRIx64" string to subplan failed, code:%d", msg->schedulerId, msg->queryId, msg->taskId, code);
return code;
}
//TODO call executer to init subquery
QW_ERR_JRET(qwAddTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, JOB_TASK_STATUS_EXECUTING));
QW_ERR_JRET(qwBuildRspMsg(NULL, TSDB_MSG_TYPE_QUERY_RSP));
//TODO call executer to execute subquery
code = 0;
void *data = NULL;
//TODO call executer to execute subquery
QW_ERR_JRET(qwGetTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, &tStatus));
tStatus->status = (code) ? JOB_TASK_STATUS_FAILED : JOB_TASK_STATUS_SUCCEED;
QW_ERR_JRET(qwAddTaskResult(qWorkerMgmt, msg->queryId, msg->taskId, data));
_return:
if (tStatus && QW_TASK_DONE(tStatus->status) && QW_READY_RECEIVED == tStatus->ready) {
QW_ERR_RET(qwBuildRspMsg(NULL, TSDB_MSG_TYPE_RES_READY_RSP));
}
qDestroySubplan(plan);
return code;
}
int32_t qWorkerProcessReadyMsg(void *qWorkerMgmt, SSchedulerReadyMsg *msg, SRpcMsg *rsp){
if (NULL == qWorkerMgmt || NULL == msg || NULL == rsp) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
SQWorkerTaskStatus *tStatus = NULL;
QW_ERR_RET(qwGetTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, &tStatus));
if (QW_TASK_DONE(tStatus->status)) {
QW_ERR_RET(qwBuildRspMsg(tStatus, TSDB_MSG_TYPE_RES_READY_RSP));
} else {
tStatus->ready = QW_READY_RECEIVED;
return TSDB_CODE_SUCCESS;
}
tStatus->ready = QW_READY_RESPONSED;
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessStatusMsg(void *qWorkerMgmt, SSchedulerStatusMsg *msg, SRpcMsg *rsp) {
if (NULL == qWorkerMgmt || NULL == msg || NULL == rsp) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
SSchedulerStatusRsp *sStatus = NULL;
QW_ERR_RET(qwGetSchTasksStatus(qWorkerMgmt, msg->schedulerId, &sStatus));
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessFetchMsg(void *qWorkerMgmt, SSchedulerFetchMsg *msg, SRpcMsg *rsp) {
if (NULL == qWorkerMgmt || NULL == msg || NULL == rsp) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
QW_ERR_RET(qwUpdateSchLastAccess(qWorkerMgmt, msg->schedulerId));
void *data = NULL;
QW_ERR_RET(qwGetTaskResult(qWorkerMgmt, msg->queryId, msg->taskId, &data));
QW_ERR_RET(qwBuildRspMsg(data, TSDB_MSG_TYPE_FETCH_RSP));
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessCancelMsg(void *qWorkerMgmt, SSchedulerCancelMsg *msg, SRpcMsg *rsp);
void qWorkerDestroy(void **qWorkerMgmt) {
if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
return;
}
SQWorkerMgmt *mgmt = *qWorkerMgmt;
//TODO STOP ALL QUERY
//TODO FREE ALL
tfree(*qWorkerMgmt);
}
...@@ -31,17 +31,9 @@ extern "C" { ...@@ -31,17 +31,9 @@ extern "C" {
#define SCH_MAX_CONDIDATE_EP_NUM TSDB_MAX_REPLICA #define SCH_MAX_CONDIDATE_EP_NUM TSDB_MAX_REPLICA
enum {
SCH_STATUS_NOT_START = 1,
SCH_STATUS_EXECUTING,
SCH_STATUS_SUCCEED,
SCH_STATUS_FAILED,
SCH_STATUS_CANCELLING,
SCH_STATUS_CANCELLED
};
typedef struct SSchedulerMgmt { typedef struct SSchedulerMgmt {
uint64_t taskId; uint64_t taskId;
uint64_t schedulerId;
SSchedulerCfg cfg; SSchedulerCfg cfg;
SHashObj *Jobs; // key: queryId, value: SQueryJob* SHashObj *Jobs; // key: queryId, value: SQueryJob*
} SSchedulerMgmt; } SSchedulerMgmt;
......
...@@ -161,7 +161,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { ...@@ -161,7 +161,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
SArray *levelPlans = NULL; SArray *levelPlans = NULL;
int32_t levelPlanNum = 0; int32_t levelPlanNum = 0;
level.status = SCH_STATUS_NOT_START; level.status = JOB_TASK_STATUS_NOT_START;
for (int32_t i = 0; i < levelNum; ++i) { for (int32_t i = 0; i < levelNum; ++i) {
level.level = i; level.level = i;
...@@ -191,7 +191,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { ...@@ -191,7 +191,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
task.plan = plan; task.plan = plan;
task.status = SCH_STATUS_NOT_START; task.status = JOB_TASK_STATUS_NOT_START;
void *p = taosArrayPush(level.subTasks, &task); void *p = taosArrayPush(level.subTasks, &task);
if (NULL == p) { if (NULL == p) {
...@@ -304,13 +304,15 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { ...@@ -304,13 +304,15 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
} }
SSchedulerQueryMsg *pMsg = msg; SSchedulerQueryMsg *pMsg = msg;
pMsg->queryId = job->queryId;
pMsg->taskId = task->taskId; pMsg->schedulerId = htobe64(schMgmt.schedulerId);
pMsg->contentLen = len; pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
pMsg->contentLen = htonl(len);
memcpy(pMsg->msg, task->msg, len); memcpy(pMsg->msg, task->msg, len);
break; break;
} }
case TSDB_MSG_TYPE_RSP_READY: { case TSDB_MSG_TYPE_RES_READY: {
msgSize = sizeof(SSchedulerReadyMsg); msgSize = sizeof(SSchedulerReadyMsg);
msg = calloc(1, msgSize); msg = calloc(1, msgSize);
if (NULL == msg) { if (NULL == msg) {
...@@ -319,8 +321,8 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { ...@@ -319,8 +321,8 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
} }
SSchedulerReadyMsg *pMsg = msg; SSchedulerReadyMsg *pMsg = msg;
pMsg->queryId = job->queryId; pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = task->taskId; pMsg->taskId = htobe64(task->taskId);
break; break;
} }
case TSDB_MSG_TYPE_FETCH: { case TSDB_MSG_TYPE_FETCH: {
...@@ -332,8 +334,8 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { ...@@ -332,8 +334,8 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
} }
SSchedulerFetchMsg *pMsg = msg; SSchedulerFetchMsg *pMsg = msg;
pMsg->queryId = job->queryId; pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = task->taskId; pMsg->taskId = htobe64(task->taskId);
break; break;
} }
default: default:
...@@ -376,7 +378,7 @@ _return: ...@@ -376,7 +378,7 @@ _return:
int32_t schProcessOnJobSuccess(SQueryJob *job) { int32_t schProcessOnJobSuccess(SQueryJob *job) {
job->status = SCH_STATUS_SUCCEED; job->status = JOB_TASK_STATUS_SUCCEED;
if (job->userFetch) { if (job->userFetch) {
SCH_ERR_RET(schFetchFromRemote(job)); SCH_ERR_RET(schFetchFromRemote(job));
...@@ -386,7 +388,7 @@ int32_t schProcessOnJobSuccess(SQueryJob *job) { ...@@ -386,7 +388,7 @@ int32_t schProcessOnJobSuccess(SQueryJob *job) {
} }
int32_t schProcessOnJobFailure(SQueryJob *job) { int32_t schProcessOnJobFailure(SQueryJob *job) {
job->status = SCH_STATUS_FAILED; job->status = JOB_TASK_STATUS_FAILED;
atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
...@@ -413,7 +415,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { ...@@ -413,7 +415,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
task->status = SCH_STATUS_SUCCEED; task->status = JOB_TASK_STATUS_SUCCEED;
int32_t parentNum = (int32_t)taosArrayGetSize(task->parents); int32_t parentNum = (int32_t)taosArrayGetSize(task->parents);
if (parentNum == 0) { if (parentNum == 0) {
...@@ -459,7 +461,7 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod ...@@ -459,7 +461,7 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod
if (!needRetry) { if (!needRetry) {
SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode); SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode);
job->status = SCH_STATUS_FAILED; job->status = JOB_TASK_STATUS_FAILED;
SCH_ERR_RET(schProcessOnJobFailure(job)); SCH_ERR_RET(schProcessOnJobFailure(job));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -478,13 +480,13 @@ int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32 ...@@ -478,13 +480,13 @@ int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32
if (rspCode != TSDB_CODE_SUCCESS) { if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
} else { } else {
code = schAsyncSendMsg(job, task, TSDB_MSG_TYPE_RSP_READY); code = schAsyncSendMsg(job, task, TSDB_MSG_TYPE_RES_READY);
if (code) { if (code) {
goto _task_error; goto _task_error;
} }
} }
break; break;
case TSDB_MSG_TYPE_RSP_READY: case TSDB_MSG_TYPE_RES_READY:
if (rspCode != TSDB_CODE_SUCCESS) { if (rspCode != TSDB_CODE_SUCCESS) {
SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode));
} else { } else {
...@@ -534,7 +536,7 @@ int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) { ...@@ -534,7 +536,7 @@ int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) {
SCH_ERR_RET(schPushTaskToExecList(job, task)); SCH_ERR_RET(schPushTaskToExecList(job, task));
task->status = SCH_STATUS_EXECUTING; task->status = JOB_TASK_STATUS_EXECUTING;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -546,22 +548,26 @@ int32_t schLaunchJob(SQueryJob *job) { ...@@ -546,22 +548,26 @@ int32_t schLaunchJob(SQueryJob *job) {
SCH_ERR_RET(schLaunchTask(job, task)); SCH_ERR_RET(schLaunchTask(job, task));
} }
job->status = SCH_STATUS_EXECUTING; job->status = JOB_TASK_STATUS_EXECUTING;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t schedulerInit(SSchedulerCfg *cfg) { int32_t schedulerInit(SSchedulerCfg *cfg) {
schMgmt.Jobs = taosHashInit(SCHEDULE_DEFAULT_JOB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == schMgmt.Jobs) {
SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", SCHEDULE_DEFAULT_JOB_NUMBER);
}
if (cfg) { if (cfg) {
schMgmt.cfg = *cfg; schMgmt.cfg = *cfg;
} else {
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
} }
schMgmt.Jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == schMgmt.Jobs) {
SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum);
}
schMgmt.schedulerId = 1; //TODO GENERATE A UUID
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -605,7 +611,7 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi ...@@ -605,7 +611,7 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
job->status = SCH_STATUS_NOT_START; job->status = JOB_TASK_STATUS_NOT_START;
SCH_ERR_JRET(schLaunchJob(job)); SCH_ERR_JRET(schLaunchJob(job));
...@@ -634,7 +640,7 @@ int32_t scheduleFetchRows(void *pJob, void **data) { ...@@ -634,7 +640,7 @@ int32_t scheduleFetchRows(void *pJob, void **data) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
if (job->status == SCH_STATUS_SUCCEED) { if (job->status == JOB_TASK_STATUS_SUCCEED) {
SCH_ERR_JRET(schFetchFromRemote(job)); SCH_ERR_JRET(schFetchFromRemote(job));
} }
...@@ -668,7 +674,7 @@ void scheduleFreeJob(void *pJob) { ...@@ -668,7 +674,7 @@ void scheduleFreeJob(void *pJob) {
return; return;
} }
if (job->status == SCH_STATUS_EXECUTING) { if (job->status == JOB_TASK_STATUS_EXECUTING) {
scheduleCancelJob(pJob); scheduleCancelJob(pJob);
} }
} }
......
...@@ -776,9 +776,16 @@ size_t taosHashGetMemSize(const SHashObj *pHashObj) { ...@@ -776,9 +776,16 @@ size_t taosHashGetMemSize(const SHashObj *pHashObj) {
return (pHashObj->capacity * (sizeof(SHashEntry) + POINTER_BYTES)) + sizeof(SHashNode) * taosHashGetSize(pHashObj) + sizeof(SHashObj); return (pHashObj->capacity * (sizeof(SHashEntry) + POINTER_BYTES)) + sizeof(SHashNode) * taosHashGetSize(pHashObj) + sizeof(SHashObj);
} }
FORCE_INLINE void *taosHashGetDataKey(SHashObj *pHashObj, void *data) { FORCE_INLINE int32_t taosHashGetKey(void *data, void** key, size_t* keyLen) {
if (NULL == data || NULL == key) {
return -1;
}
SHashNode * node = GET_HASH_PNODE(data); SHashNode * node = GET_HASH_PNODE(data);
return GET_HASH_NODE_KEY(node); *key = GET_HASH_NODE_KEY(node);
*keyLen = node->keyLen;
return 0;
} }
FORCE_INLINE uint32_t taosHashGetDataKeyLen(SHashObj *pHashObj, void *data) { FORCE_INLINE uint32_t taosHashGetDataKeyLen(SHashObj *pHashObj, void *data) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册