未验证 提交 0e521586 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #9356 from taosdata/feature/qnode

Feature/qnode
...@@ -587,10 +587,6 @@ typedef struct { ...@@ -587,10 +587,6 @@ typedef struct {
typedef struct { typedef struct {
int32_t code; int32_t code;
union {
uint64_t qhandle;
uint64_t qId;
}; // query handle
} SQueryTableRsp; } SQueryTableRsp;
// todo: the show handle should be replaced with id // todo: the show handle should be replaced with id
...@@ -1121,6 +1117,10 @@ typedef struct SResReadyMsg { ...@@ -1121,6 +1117,10 @@ typedef struct SResReadyMsg {
uint64_t taskId; uint64_t taskId;
} SResReadyMsg; } SResReadyMsg;
typedef struct SResReadyRsp {
int32_t code;
} SResReadyRsp;
typedef struct SResFetchMsg { typedef struct SResFetchMsg {
uint64_t schedulerId; uint64_t schedulerId;
uint64_t queryId; uint64_t queryId;
...@@ -1149,12 +1149,20 @@ typedef struct STaskCancelMsg { ...@@ -1149,12 +1149,20 @@ typedef struct STaskCancelMsg {
uint64_t taskId; uint64_t taskId;
} STaskCancelMsg; } STaskCancelMsg;
typedef struct STaskCancelRsp {
int32_t code;
} STaskCancelRsp;
typedef struct STaskDropMsg { typedef struct STaskDropMsg {
uint64_t schedulerId; uint64_t schedulerId;
uint64_t queryId; uint64_t queryId;
uint64_t taskId; uint64_t taskId;
} STaskDropMsg; } STaskDropMsg;
typedef struct STaskDropRsp {
int32_t code;
} STaskDropRsp;
#pragma pack(pop) #pragma pack(pop)
......
...@@ -42,17 +42,17 @@ typedef struct { ...@@ -42,17 +42,17 @@ typedef struct {
int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt); int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt);
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg **rsp); int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp); int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp); int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp); int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp); int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp); int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
void qWorkerDestroy(void **qWorkerMgmt); void qWorkerDestroy(void **qWorkerMgmt);
......
...@@ -15,9 +15,10 @@ target_link_libraries( ...@@ -15,9 +15,10 @@ target_link_libraries(
PUBLIC wal PUBLIC wal
PUBLIC sync PUBLIC sync
PUBLIC cjson PUBLIC cjson
PUBLIC qworker
) )
# test # test
if(${BUILD_TEST}) if(${BUILD_TEST})
add_subdirectory(test) add_subdirectory(test)
endif(${BUILD_TEST}) endif(${BUILD_TEST})
\ No newline at end of file
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
#include "vnodeRequest.h" #include "vnodeRequest.h"
#include "vnodeStateMgr.h" #include "vnodeStateMgr.h"
#include "vnodeSync.h" #include "vnodeSync.h"
#include "vnodeQuery.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -72,6 +73,7 @@ struct SVnode { ...@@ -72,6 +73,7 @@ struct SVnode {
SVnodeSync* pSync; SVnodeSync* pSync;
SVnodeFS* pFs; SVnodeFS* pFs;
tsem_t canCommit; tsem_t canCommit;
void* pQuery;
}; };
int vnodeScheduleTask(SVnodeTask* task); int vnodeScheduleTask(SVnodeTask* task);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_READ_H_
#define _TD_VNODE_READ_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
#include "qworker.h"
int vnodeQueryOpen(SVnode *pVnode);
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_READ_H_*/
...@@ -24,16 +24,6 @@ int32_t vnodeSync(SVnode *pVnode) { return 0; } ...@@ -24,16 +24,6 @@ int32_t vnodeSync(SVnode *pVnode) { return 0; }
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { return 0; } int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { return 0; }
int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vInfo("query message is processed");
return 0;
}
int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vInfo("fetch message is processed");
return 0;
}
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vInfo("sync message is processed"); vInfo("sync message is processed");
return 0; return 0;
......
...@@ -127,6 +127,11 @@ static int vnodeOpenImpl(SVnode *pVnode) { ...@@ -127,6 +127,11 @@ static int vnodeOpenImpl(SVnode *pVnode) {
return -1; return -1;
} }
// Open Query
if (vnodeQueryOpen(pVnode)) {
return -1;
}
// TODO // TODO
return 0; return 0;
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeDef.h"
#include "vnodeQuery.h"
int vnodeQueryOpen(SVnode *pVnode) {
return qWorkerInit(NULL, &pVnode->pQuery);
}
int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vInfo("query message is processed");
qWorkerProcessQueryMsg(pVnode, pVnode->pQuery, pMsg);
return 0;
}
int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vInfo("fetch message is processed");
qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg);
return 0;
}
...@@ -13,8 +13,8 @@ ...@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_META_QUERY_H_ #ifndef _VNODE_QUERY_H_
#define _TD_META_QUERY_H_ #define _VNODE_QUERY_H_
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -24,4 +24,4 @@ extern "C" { ...@@ -24,4 +24,4 @@ extern "C" {
} }
#endif #endif
#endif /*_TD_META_QUERY_H_*/ #endif /*_VNODE_QUERY_H_*/
\ No newline at end of file \ No newline at end of file
...@@ -553,28 +553,122 @@ _return: ...@@ -553,28 +553,122 @@ _return:
int32_t qwBuildAndSendQueryRsp(SRpcMsg *pMsg, int32_t code) { int32_t qwBuildAndSendQueryRsp(SRpcMsg *pMsg, int32_t code) {
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code;
SRpcMsg rpcRsp = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
};
rpcSendResponse(&rpcRsp);
return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendReadyRsp(SRpcMsg *pMsg, int32_t code) { int32_t qwBuildAndSendReadyRsp(SRpcMsg *pMsg, int32_t code) {
SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp));
pRsp->code = code;
SRpcMsg rpcRsp = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
};
rpcSendResponse(&rpcRsp);
return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) { int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) {
int32_t size = 0;
if (sStatus) {
size = sizeof(SSchedulerStatusRsp) + sizeof(sStatus->status[0]) * sStatus->num;
} else {
size = sizeof(SSchedulerStatusRsp);
}
SSchedulerStatusRsp *pRsp = (SSchedulerStatusRsp *)rpcMallocCont(size);
if (sStatus) {
memcpy(pRsp, sStatus, size);
} else {
pRsp->num = 0;
}
SRpcMsg rpcRsp = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = size,
.code = 0,
};
rpcSendResponse(&rpcRsp);
return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendFetchRsp(SRpcMsg *pMsg, void *data) { int32_t qwBuildAndSendFetchRsp(SRpcMsg *pMsg, void *data) {
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
//TODO fill msg
pRsp->completed = true;
SRpcMsg rpcRsp = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = 0,
};
rpcSendResponse(&rpcRsp);
return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg) { int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code) {
STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
pRsp->code = code;
SRpcMsg rpcRsp = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
};
rpcSendResponse(&rpcRsp);
return TSDB_CODE_SUCCESS;
} }
int32_t qwBuildAndSendDropRsp(SRpcMsg *pMsg) { int32_t qwBuildAndSendDropRsp(SRpcMsg *pMsg, int32_t code) {
STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
pRsp->code = code;
SRpcMsg rpcRsp = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.pCont = pRsp,
.contLen = sizeof(*pRsp),
.code = code,
};
rpcSendResponse(&rpcRsp);
return TSDB_CODE_SUCCESS;
} }
...@@ -712,8 +806,10 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedu ...@@ -712,8 +806,10 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedu
SQWorkerSchStatus *sch = NULL; SQWorkerSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL; SQWorkerTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
int32_t needRsp = true;
void *data = NULL;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task));
QW_LOCK(QW_READ, &task->lock); QW_LOCK(QW_READ, &task->lock);
...@@ -724,7 +820,7 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedu ...@@ -724,7 +820,7 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedu
} }
if (QW_GOT_RES_DATA(res->data)) { if (QW_GOT_RES_DATA(res->data)) {
QW_ERR_JRET(qwBuildAndSendFetchRsp(pMsg, res->data)); data = res->data;
if (QW_LOW_RES_DATA(res->data)) { if (QW_LOW_RES_DATA(res->data)) {
if (task->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { if (task->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
//TODO add query back to queue //TODO add query back to queue
...@@ -737,6 +833,8 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedu ...@@ -737,6 +833,8 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedu
} }
//TODO SET FLAG FOR QUERY TO SEND RSP WHEN RES READY //TODO SET FLAG FOR QUERY TO SEND RSP WHEN RES READY
needRsp = false;
} }
_return: _return:
...@@ -746,9 +844,12 @@ _return: ...@@ -746,9 +844,12 @@ _return:
if (sch) { if (sch) {
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
} }
qwReleaseScheduler(QW_READ, mgmt); if (needRsp) {
qwBuildAndSendFetchRsp(pMsg, res->data);
}
QW_RET(code); QW_RET(code);
} }
...@@ -832,13 +933,14 @@ int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) { ...@@ -832,13 +933,14 @@ int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg **rsp) { int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
SSubQueryMsg *msg = pMsg->pCont; SSubQueryMsg *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
qError("invalid query msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
...@@ -851,7 +953,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRp ...@@ -851,7 +953,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRp
QW_ERR_JRET(qwCheckTaskCancelDrop(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, &needStop)); QW_ERR_JRET(qwCheckTaskCancelDrop(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, &needStop));
if (needStop) { if (needStop) {
qWarn("task need stop"); qWarn("task need stop");
QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED); QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
} }
code = qStringToSubplan(msg->msg, &plan); code = qStringToSubplan(msg->msg, &plan);
...@@ -910,13 +1012,14 @@ _return: ...@@ -910,13 +1012,14 @@ _return:
QW_RET(code); QW_RET(code);
} }
int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp){ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
SResReadyMsg *msg = pMsg->pCont; SResReadyMsg *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
qError("invalid task status msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
...@@ -925,27 +1028,31 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRp ...@@ -925,27 +1028,31 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRp
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp) { int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
int32_t code = 0;
SSchTasksStatusMsg *msg = pMsg->pCont; SSchTasksStatusMsg *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
qError("invalid task status msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
SSchedulerStatusRsp *sStatus = NULL; SSchedulerStatusRsp *sStatus = NULL;
QW_ERR_RET(qwGetSchTasksStatus(qWorkerMgmt, msg->schedulerId, &sStatus)); QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->schedulerId, &sStatus));
_return:
QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus)); QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp) { int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
...@@ -971,36 +1078,44 @@ _return: ...@@ -971,36 +1078,44 @@ _return:
QW_RET(code); QW_RET(code);
} }
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp) { int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
int32_t code = 0;
STaskCancelMsg *msg = pMsg->pCont; STaskCancelMsg *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
qError("invalid task cancel msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
QW_ERR_RET(qwCancelTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId)); QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId));
QW_ERR_RET(qwBuildAndSendCancelRsp(pMsg)); _return:
QW_ERR_RET(qwBuildAndSendCancelRsp(pMsg, code));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp) { int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
int32_t code = 0;
STaskDropMsg *msg = pMsg->pCont; STaskDropMsg *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
qError("invalid task drop msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
QW_ERR_RET(qwCancelDropTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId)); QW_ERR_JRET(qwCancelDropTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId));
_return:
QW_ERR_RET(qwBuildAndSendDropRsp(pMsg)); QW_ERR_RET(qwBuildAndSendDropRsp(pMsg, code));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -31,17 +31,35 @@ extern "C" { ...@@ -31,17 +31,35 @@ extern "C" {
#define SCH_MAX_CONDIDATE_EP_NUM TSDB_MAX_REPLICA #define SCH_MAX_CONDIDATE_EP_NUM TSDB_MAX_REPLICA
enum {
SCH_READ = 1,
SCH_WRITE,
};
typedef struct SSchedulerMgmt { typedef struct SSchedulerMgmt {
uint64_t taskId; uint64_t taskId;
uint64_t schedulerId; uint64_t schedulerId;
SSchedulerCfg cfg; SSchedulerCfg cfg;
SHashObj *Jobs; // key: queryId, value: SQueryJob* SHashObj *jobs; // key: queryId, value: SQueryJob*
} SSchedulerMgmt; } SSchedulerMgmt;
typedef struct SQueryLevel {
int32_t level;
int8_t status;
SRWLatch lock;
int32_t taskFailed;
int32_t taskSucceed;
int32_t taskNum;
SArray *subTasks; // Element is SQueryTask
} SQueryLevel;
typedef struct SQueryTask { typedef struct SQueryTask {
uint64_t taskId; // task id uint64_t taskId; // task id
SQueryLevel *level; // level
SSubplan *plan; // subplan SSubplan *plan; // subplan
char *msg; // operator tree char *msg; // operator tree
int32_t msgLen; // msg length
int8_t status; // task status int8_t status; // task status
SEpAddr execAddr; // task actual executed node address SEpAddr execAddr; // task actual executed node address
SQueryProfileSummary summary; // task execution summary SQueryProfileSummary summary; // task execution summary
...@@ -50,21 +68,14 @@ typedef struct SQueryTask { ...@@ -50,21 +68,14 @@ typedef struct SQueryTask {
SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask*
} SQueryTask; } SQueryTask;
typedef struct SQueryLevel {
int32_t level;
int8_t status;
int32_t taskNum;
SArray *subTasks; // Element is SQueryTask
} SQueryLevel;
typedef struct SQueryJob { typedef struct SQueryJob {
uint64_t queryId; uint64_t queryId;
int32_t levelNum; int32_t levelNum;
int32_t levelIdx; int32_t levelIdx;
int8_t status; int8_t status;
SQueryProfileSummary summary; SQueryProfileSummary summary;
SEpSet dataSrcEps; SEpSet dataSrcEps;
SEpAddr resEp; SEpAddr resEp;
void *transport; void *transport;
SArray *qnodeList; SArray *qnodeList;
tsem_t rspSem; tsem_t rspSem;
...@@ -74,6 +85,7 @@ typedef struct SQueryJob { ...@@ -74,6 +85,7 @@ typedef struct SQueryJob {
SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask*
SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask*
SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask*
SArray *levels; // Element is SQueryLevel, starting from 0. SArray *levels; // Element is SQueryLevel, starting from 0.
SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
...@@ -81,7 +93,8 @@ typedef struct SQueryJob { ...@@ -81,7 +93,8 @@ typedef struct SQueryJob {
#define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE #define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE
#define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE #define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE
#define SCH_IS_DATA_SRC_TASK(task) (task->plan->type == QUERY_TYPE_SCAN) #define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN)
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
#define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__) #define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__)
#define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) #define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__)
...@@ -91,6 +104,9 @@ typedef struct SQueryJob { ...@@ -91,6 +104,9 @@ typedef struct SQueryJob {
#define SCH_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0) #define SCH_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) #define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock))
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
extern int32_t schLaunchTask(SQueryJob *job, SQueryTask *task); extern int32_t schLaunchTask(SQueryJob *job, SQueryTask *task);
......
...@@ -160,11 +160,19 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { ...@@ -160,11 +160,19 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
SQueryLevel level = {0}; SQueryLevel level = {0};
SArray *levelPlans = NULL; SArray *levelPlans = NULL;
int32_t levelPlanNum = 0; int32_t levelPlanNum = 0;
SQueryLevel *pLevel = NULL;
level.status = JOB_TASK_STATUS_NOT_START; level.status = JOB_TASK_STATUS_NOT_START;
for (int32_t i = 0; i < levelNum; ++i) { for (int32_t i = 0; i < levelNum; ++i) {
level.level = i; if (NULL == taosArrayPush(job->levels, &level)) {
qError("taosArrayPush failed");
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pLevel = taosArrayGet(job->levels, i);
pLevel->level = i;
levelPlans = taosArrayGetP(dag->pSubplans, i); levelPlans = taosArrayGetP(dag->pSubplans, i);
if (NULL == levelPlans) { if (NULL == levelPlans) {
qError("no level plans for level %d", i); qError("no level plans for level %d", i);
...@@ -177,10 +185,10 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { ...@@ -177,10 +185,10 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
} }
level.taskNum = levelPlanNum; pLevel->taskNum = levelPlanNum;
level.subTasks = taosArrayInit(levelPlanNum, sizeof(SQueryTask)); pLevel->subTasks = taosArrayInit(levelPlanNum, sizeof(SQueryTask));
if (NULL == level.subTasks) { if (NULL == pLevel->subTasks) {
qError("taosArrayInit %d failed", levelPlanNum); qError("taosArrayInit %d failed", levelPlanNum);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
...@@ -191,9 +199,10 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { ...@@ -191,9 +199,10 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
task.plan = plan; task.plan = plan;
task.level = pLevel;
task.status = JOB_TASK_STATUS_NOT_START; task.status = JOB_TASK_STATUS_NOT_START;
void *p = taosArrayPush(level.subTasks, &task); void *p = taosArrayPush(pLevel->subTasks, &task);
if (NULL == p) { if (NULL == p) {
qError("taosArrayPush failed"); qError("taosArrayPush failed");
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
...@@ -205,10 +214,6 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { ...@@ -205,10 +214,6 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
} }
} }
if (NULL == taosArrayPush(job->levels, &level)) {
qError("taosArrayPush failed");
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
} }
SCH_ERR_JRET(schBuildTaskRalation(job, planToTask)); SCH_ERR_JRET(schBuildTaskRalation(job, planToTask));
...@@ -220,8 +225,8 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { ...@@ -220,8 +225,8 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
if (level.subTasks) { if (pLevel->subTasks) {
taosArrayDestroy(level.subTasks); taosArrayDestroy(pLevel->subTasks);
} }
if (planToTask) { if (planToTask) {
...@@ -273,7 +278,23 @@ int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) { ...@@ -273,7 +278,23 @@ int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { if (0 != taosHashPut(job->succTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) {
qError("taosHashPut failed");
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
*moved = true;
return TSDB_CODE_SUCCESS;
}
int32_t schMoveTaskToFailList(SQueryJob *job, SQueryTask *task, bool *moved) {
if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) {
qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId);
return TSDB_CODE_SUCCESS;
}
if (0 != taosHashPut(job->failTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) {
qError("taosHashPut failed"); qError("taosHashPut failed");
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
...@@ -289,14 +310,23 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { ...@@ -289,14 +310,23 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
void *msg = NULL; void *msg = NULL;
switch (msgType) { switch (msgType) {
case TSDB_MSG_TYPE_SUBMIT: {
if (NULL == task->msg || task->msgLen <= 0) {
qError("submit msg is NULL");
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
}
msgSize = task->msgLen;
msg = task->msg;
break;
}
case TSDB_MSG_TYPE_QUERY: { case TSDB_MSG_TYPE_QUERY: {
if (NULL == task->msg) { if (NULL == task->msg) {
qError("query msg is NULL"); qError("query msg is NULL");
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
int32_t len = strlen(task->msg); msgSize = sizeof(SSubQueryMsg) + task->msgLen;
msgSize = sizeof(SSubQueryMsg) + len + 1;
msg = calloc(1, msgSize); msg = calloc(1, msgSize);
if (NULL == msg) { if (NULL == msg) {
qError("calloc %d failed", msgSize); qError("calloc %d failed", msgSize);
...@@ -308,11 +338,10 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { ...@@ -308,11 +338,10 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
pMsg->schedulerId = htobe64(schMgmt.schedulerId); pMsg->schedulerId = htobe64(schMgmt.schedulerId);
pMsg->queryId = htobe64(job->queryId); pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId); pMsg->taskId = htobe64(task->taskId);
pMsg->contentLen = htonl(len); pMsg->contentLen = htonl(task->msgLen);
memcpy(pMsg->msg, task->msg, len); memcpy(pMsg->msg, task->msg, task->msgLen);
pMsg->msg[len] = 0;
break; break;
} }
case TSDB_MSG_TYPE_RES_READY: { case TSDB_MSG_TYPE_RES_READY: {
msgSize = sizeof(SResReadyMsg); msgSize = sizeof(SResReadyMsg);
msg = calloc(1, msgSize); msg = calloc(1, msgSize);
...@@ -322,6 +351,7 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { ...@@ -322,6 +351,7 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
} }
SResReadyMsg *pMsg = msg; SResReadyMsg *pMsg = msg;
pMsg->schedulerId = htobe64(schMgmt.schedulerId);
pMsg->queryId = htobe64(job->queryId); pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId); pMsg->taskId = htobe64(task->taskId);
break; break;
...@@ -335,6 +365,21 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { ...@@ -335,6 +365,21 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
} }
SResFetchMsg *pMsg = msg; SResFetchMsg *pMsg = msg;
pMsg->schedulerId = htobe64(schMgmt.schedulerId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
break;
}
case TSDB_MSG_TYPE_DROP_TASK:{
msgSize = sizeof(STaskDropMsg);
msg = calloc(1, msgSize);
if (NULL == msg) {
qError("calloc %d failed", msgSize);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
STaskDropMsg *pMsg = msg;
pMsg->schedulerId = htobe64(schMgmt.schedulerId);
pMsg->queryId = htobe64(job->queryId); pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId); pMsg->taskId = htobe64(task->taskId);
break; break;
...@@ -345,6 +390,7 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { ...@@ -345,6 +390,7 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
} }
//TODO SEND MSG //TODO SEND MSG
//taosAsyncExec(rpcSendRequest(void * shandle, const SEpSet * pEpSet, SRpcMsg * pMsg, int64_t * pRid), p, &code);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -425,8 +471,29 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { ...@@ -425,8 +471,29 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
strncpy(job->resEp.fqdn, task->execAddr.fqdn, sizeof(job->resEp.fqdn)); int32_t taskDone = 0;
job->resEp.port = task->execAddr.port;
if (SCH_TASK_NEED_WAIT_ALL(task)) {
SCH_LOCK(SCH_WRITE, &task->level->lock);
task->level->taskFailed++;
taskDone = task->level->taskSucceed + task->level->taskFailed;
SCH_UNLOCK(SCH_WRITE, &task->level->lock);
if (taskDone < task->level->taskNum) {
qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum);
return TSDB_CODE_SUCCESS;
}
if (task->level->taskFailed > 0) {
job->status = JOB_TASK_STATUS_FAILED;
SCH_ERR_RET(schProcessOnJobFailure(job));
return TSDB_CODE_SUCCESS;
}
} else {
strncpy(job->resEp.fqdn, task->execAddr.fqdn, sizeof(job->resEp.fqdn));
job->resEp.port = task->execAddr.port;
}
SCH_ERR_RET(schProcessOnJobSuccess(job)); SCH_ERR_RET(schProcessOnJobSuccess(job));
...@@ -457,10 +524,30 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { ...@@ -457,10 +524,30 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCode) { int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCode) {
bool needRetry = false; bool needRetry = false;
bool moved = false;
int32_t taskDone = 0;
SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry)); SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry));
if (!needRetry) { if (!needRetry) {
SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode); SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode);
SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved));
if (!moved) {
SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status);
return TSDB_CODE_SUCCESS;
}
if (SCH_TASK_NEED_WAIT_ALL(task)) {
SCH_LOCK(SCH_WRITE, &task->level->lock);
task->level->taskFailed++;
taskDone = task->level->taskSucceed + task->level->taskFailed;
SCH_UNLOCK(SCH_WRITE, &task->level->lock);
if (taskDone < task->level->taskNum) {
qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum);
return TSDB_CODE_SUCCESS;
}
}
job->status = JOB_TASK_STATUS_FAILED; job->status = JOB_TASK_STATUS_FAILED;
SCH_ERR_RET(schProcessOnJobFailure(job)); SCH_ERR_RET(schProcessOnJobFailure(job));
...@@ -522,8 +609,7 @@ _return: ...@@ -522,8 +609,7 @@ _return:
int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) { int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) {
SSubplan *plan = task->plan; SSubplan *plan = task->plan;
int32_t len = 0; SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen));
SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &len));
if (plan->execEpSet.numOfEps <= 0) { if (plan->execEpSet.numOfEps <= 0) {
SCH_ERR_RET(schSetTaskExecEpSet(job, &plan->execEpSet)); SCH_ERR_RET(schSetTaskExecEpSet(job, &plan->execEpSet));
} }
...@@ -532,8 +618,10 @@ int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) { ...@@ -532,8 +618,10 @@ int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) {
SCH_TASK_ERR_LOG("invalid execEpSet num:%d", plan->execEpSet.numOfEps); SCH_TASK_ERR_LOG("invalid execEpSet num:%d", plan->execEpSet.numOfEps);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
int32_t msgType = (plan->type == QUERY_TYPE_MODIFY) ? TSDB_MSG_TYPE_SUBMIT : TSDB_MSG_TYPE_QUERY;
SCH_ERR_RET(schAsyncSendMsg(job, task, TSDB_MSG_TYPE_QUERY)); SCH_ERR_RET(schAsyncSendMsg(job, task, msgType));
SCH_ERR_RET(schPushTaskToExecList(job, task)); SCH_ERR_RET(schPushTaskToExecList(job, task));
...@@ -554,6 +642,25 @@ int32_t schLaunchJob(SQueryJob *job) { ...@@ -554,6 +642,25 @@ int32_t schLaunchJob(SQueryJob *job) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void schDropJobAllTasks(SQueryJob *job) {
void *pIter = taosHashIterate(job->succTasks, NULL);
while (pIter) {
SQueryTask *task = *(SQueryTask **)pIter;
schAsyncSendMsg(job, task, TSDB_MSG_TYPE_DROP_TASK);
pIter = taosHashIterate(job->succTasks, pIter);
}
pIter = taosHashIterate(job->failTasks, NULL);
while (pIter) {
SQueryTask *task = *(SQueryTask **)pIter;
schAsyncSendMsg(job, task, TSDB_MSG_TYPE_DROP_TASK);
pIter = taosHashIterate(job->succTasks, pIter);
}
}
int32_t schedulerInit(SSchedulerCfg *cfg) { int32_t schedulerInit(SSchedulerCfg *cfg) {
if (cfg) { if (cfg) {
...@@ -562,8 +669,8 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { ...@@ -562,8 +669,8 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER; schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
} }
schMgmt.Jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); schMgmt.jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == schMgmt.Jobs) { if (NULL == schMgmt.jobs) {
SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum); SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum);
} }
...@@ -605,9 +712,15 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi ...@@ -605,9 +712,15 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
job->failTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == job->failTasks) {
qError("taosHashInit %d failed", pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
tsem_init(&job->rspSem, 0, 0); tsem_init(&job->rspSem, 0, 0);
if (0 != taosHashPut(schMgmt.Jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES)) { if (0 != taosHashPut(schMgmt.jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES)) {
qError("taosHashPut queryId:%"PRIx64" failed", job->queryId); qError("taosHashPut queryId:%"PRIx64" failed", job->queryId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
...@@ -659,6 +772,8 @@ _return: ...@@ -659,6 +772,8 @@ _return:
int32_t scheduleCancelJob(void *pJob) { int32_t scheduleCancelJob(void *pJob) {
//TODO //TODO
//TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -670,7 +785,7 @@ void scheduleFreeJob(void *pJob) { ...@@ -670,7 +785,7 @@ void scheduleFreeJob(void *pJob) {
SQueryJob *job = pJob; SQueryJob *job = pJob;
if (job->status > 0) { if (job->status > 0) {
if (0 != taosHashRemove(schMgmt.Jobs, &job->queryId, sizeof(job->queryId))) { if (0 != taosHashRemove(schMgmt.jobs, &job->queryId, sizeof(job->queryId))) {
qError("remove job:%"PRIx64"from mgmt failed", job->queryId); // maybe already freed qError("remove job:%"PRIx64"from mgmt failed", job->queryId); // maybe already freed
return; return;
} }
...@@ -678,15 +793,17 @@ void scheduleFreeJob(void *pJob) { ...@@ -678,15 +793,17 @@ void scheduleFreeJob(void *pJob) {
if (job->status == JOB_TASK_STATUS_EXECUTING) { if (job->status == JOB_TASK_STATUS_EXECUTING) {
scheduleCancelJob(pJob); scheduleCancelJob(pJob);
} }
schDropJobAllTasks(job);
} }
//TODO free job //TODO free job
} }
void schedulerDestroy(void) { void schedulerDestroy(void) {
if (schMgmt.Jobs) { if (schMgmt.jobs) {
taosHashCleanup(schMgmt.Jobs); //TODO taosHashCleanup(schMgmt.jobs); //TODO
schMgmt.Jobs = NULL; schMgmt.jobs = NULL;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册