未验证 提交 7b8e19da 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #9804 from taosdata/feature/qnode

Feature/qnode
...@@ -1033,6 +1033,13 @@ typedef struct { ...@@ -1033,6 +1033,13 @@ typedef struct {
uint64_t taskId; uint64_t taskId;
} SSinkDataReq; } SSinkDataReq;
typedef struct {
SMsgHead header;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
} SQueryContinueReq;
typedef struct { typedef struct {
SMsgHead header; SMsgHead header;
......
...@@ -170,6 +170,8 @@ enum { ...@@ -170,6 +170,8 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES, "vnode-show-tables", SVShowTablesReq, SVShowTablesRsp) TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES, "vnode-show-tables", SVShowTablesReq, SVShowTablesRsp)
TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES_FETCH, "vnode-show-tables-fetch", SVShowTablesFetchReq, SVShowTablesFetchRsp) TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES_FETCH, "vnode-show-tables-fetch", SVShowTablesFetchReq, SVShowTablesFetchRsp)
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "vnode-query-continue", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SCHEDULE_DATA_SINK, "vnode-schedule-data-sink", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
......
...@@ -22,9 +22,18 @@ extern "C" { ...@@ -22,9 +22,18 @@ extern "C" {
#include "trpc.h" #include "trpc.h"
enum {
NODE_TYPE_VNODE = 1,
NODE_TYPE_QNODE,
NODE_TYPE_SNODE,
};
typedef struct SQWorkerCfg { typedef struct SQWorkerCfg {
uint32_t maxSchedulerNum; uint32_t maxSchedulerNum;
uint32_t maxResCacheNum; uint32_t maxTaskNum;
uint32_t maxSchTaskNum; uint32_t maxSchTaskNum;
} SQWorkerCfg; } SQWorkerCfg;
...@@ -39,11 +48,17 @@ typedef struct { ...@@ -39,11 +48,17 @@ typedef struct {
uint64_t numOfErrors; uint64_t numOfErrors;
} SQWorkerStat; } SQWorkerStat;
typedef int32_t (*putReqToQueryQFp)(void *, struct SRpcMsg *);
int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt);
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, putReqToQueryQFp fp);
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessDataSinkMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
......
...@@ -22,6 +22,7 @@ extern "C" { ...@@ -22,6 +22,7 @@ extern "C" {
#include "qworker.h" #include "qworker.h"
#include "vnode.h" #include "vnode.h"
typedef struct SQWorkerMgmt SQHandle; typedef struct SQWorkerMgmt SQHandle;
int vnodeQueryOpen(SVnode *pVnode); int vnodeQueryOpen(SVnode *pVnode);
......
...@@ -19,11 +19,22 @@ ...@@ -19,11 +19,22 @@
static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg); static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg);
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NULL, &pVnode->pQuery); } int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, &pVnode->pQuery, pVnode, vnodePutReqToVQueryQ); }
int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vTrace("query message is processed"); vTrace("query message is processing");
return qWorkerProcessQueryMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
switch (pMsg->msgType) {
case TDMT_VND_QUERY:
return qWorkerProcessQueryMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
case TDMT_VND_QUERY_CONTINUE:
return qWorkerProcessQueryContinueMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
case TDMT_VND_SCHEDULE_DATA_SINK:
return qWorkerProcessDataSinkMsg(pVnode->pTsdb, pVnode->pQuery, pMsg);
default:
vError("unknown msg type:%d in query queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
}
} }
int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
......
...@@ -178,8 +178,10 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) { ...@@ -178,8 +178,10 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
int64_t st = 0; int64_t st = 0;
*handle = pTaskInfo->dsHandle; if (handle) {
*handle = pTaskInfo->dsHandle;
}
while(1) { while(1) {
st = taosGetTimestampUs(); st = taosGetTimestampUs();
SSDataBlock* pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup); SSDataBlock* pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup);
......
...@@ -23,7 +23,7 @@ extern "C" { ...@@ -23,7 +23,7 @@ extern "C" {
#include "tlockfree.h" #include "tlockfree.h"
#define QWORKER_DEFAULT_SCHEDULER_NUMBER 10000 #define QWORKER_DEFAULT_SCHEDULER_NUMBER 10000
#define QWORKER_DEFAULT_RES_CACHE_NUMBER 10000 #define QWORKER_DEFAULT_TASK_NUMBER 10000
#define QWORKER_DEFAULT_SCH_TASK_NUMBER 10000 #define QWORKER_DEFAULT_SCH_TASK_NUMBER 10000
enum { enum {
...@@ -57,7 +57,6 @@ enum { ...@@ -57,7 +57,6 @@ enum {
QW_ADD_ACQUIRE, QW_ADD_ACQUIRE,
}; };
typedef struct SQWTaskStatus { typedef struct SQWTaskStatus {
SRWLatch lock; SRWLatch lock;
int32_t code; int32_t code;
...@@ -67,12 +66,15 @@ typedef struct SQWTaskStatus { ...@@ -67,12 +66,15 @@ typedef struct SQWTaskStatus {
bool drop; bool drop;
} SQWTaskStatus; } SQWTaskStatus;
typedef struct SQWorkerTaskHandlesCache { typedef struct SQWTaskCtx {
SRWLatch lock; SRWLatch lock;
int8_t sinkScheduled;
int8_t queryScheduled;
bool needRsp; bool needRsp;
qTaskInfo_t taskHandle; qTaskInfo_t taskHandle;
DataSinkHandle sinkHandle; DataSinkHandle sinkHandle;
} SQWorkerTaskHandlesCache; } SQWTaskCtx;
typedef struct SQWSchStatus { typedef struct SQWSchStatus {
int32_t lastAccessTs; // timestamp in second int32_t lastAccessTs; // timestamp in second
...@@ -82,11 +84,15 @@ typedef struct SQWSchStatus { ...@@ -82,11 +84,15 @@ typedef struct SQWSchStatus {
// Qnode/Vnode level task management // Qnode/Vnode level task management
typedef struct SQWorkerMgmt { typedef struct SQWorkerMgmt {
SQWorkerCfg cfg; SQWorkerCfg cfg;
SRWLatch schLock; int8_t nodeType;
SRWLatch resLock; int32_t nodeId;
SHashObj *schHash; //key: schedulerId, value: SQWSchStatus SRWLatch schLock;
SHashObj *resHash; //key: queryId+taskId, value: SQWorkerResCache SRWLatch ctxLock;
SHashObj *schHash; //key: schedulerId, value: SQWSchStatus
SHashObj *ctxHash; //key: queryId+taskId, value: SQWTaskCtx
void *nodeObj;
putReqToQueryQFp putToQueueFp;
} SQWorkerMgmt; } SQWorkerMgmt;
#define QW_GOT_RES_DATA(data) (true) #define QW_GOT_RES_DATA(data) (true)
...@@ -94,41 +100,69 @@ typedef struct SQWorkerMgmt { ...@@ -94,41 +100,69 @@ typedef struct SQWorkerMgmt {
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code)) #define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code)) #define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
#define QW_TASK_READY_RESP(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || status == JOB_TASK_STATUS_PARTIAL_SUCCEED) #define QW_TASK_READY(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || status == JOB_TASK_STATUS_PARTIAL_SUCCEED)
#define QW_SET_QTID(id, qid, tid) do { *(uint64_t *)(id) = (qid); *(uint64_t *)((char *)(id) + sizeof(qid)) = (tid); } while (0) #define QW_SET_QTID(id, qId, tId) do { *(uint64_t *)(id) = (qId); *(uint64_t *)((char *)(id) + sizeof(qId)) = (tId); } while (0)
#define QW_GET_QTID(id, qid, tid) do { (qid) = *(uint64_t *)(id); (tid) = *(uint64_t *)((char *)(id) + sizeof(qid)); } while (0) #define QW_GET_QTID(id, qId, tId) do { (qId) = *(uint64_t *)(id); (tId) = *(uint64_t *)((char *)(id) + sizeof(qId)); } while (0)
#define QW_IDS() sId, qId, tId
#define QW_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define QW_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define QW_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define QW_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define QW_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
#define QW_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) #define QW_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define QW_ELOG(param, ...) qError("QW:%p " param, mgmt, __VA_ARGS__)
#define QW_DLOG(param, ...) qDebug("QW:%p " param, mgmt, __VA_ARGS__)
#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
#define QW_LOCK(type, _lock) do { \ #define QW_LOCK(type, _lock) do { \
if (QW_READ == (type)) { \ if (QW_READ == (type)) { \
if ((*(_lock)) < 0) assert(0); \ assert(atomic_load_32((_lock)) >= 0); \
taosRLockLatch(_lock); \ qDebug("QW RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
qDebug("QW RLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ taosRLockLatch(_lock); \
qDebug("QW RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) > 0); \
} else { \ } else { \
if ((*(_lock)) < 0) assert(0); \ assert(atomic_load_32((_lock)) >= 0); \
qDebug("QW WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \ taosWLockLatch(_lock); \
qDebug("QW WLOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ qDebug("QW WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
} \ } \
} while (0) } while (0)
#define QW_UNLOCK(type, _lock) do { \ #define QW_UNLOCK(type, _lock) do { \
if (QW_READ == (type)) { \ if (QW_READ == (type)) { \
if ((*(_lock)) <= 0) assert(0); \ assert(atomic_load_32((_lock)) > 0); \
qDebug("QW RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \ taosRUnLockLatch(_lock); \
qDebug("QW RULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ qDebug("QW RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} else { \ } else { \
if ((*(_lock)) <= 0) assert(0); \ assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY); \
qDebug("QW WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \ taosWUnLockLatch(_lock); \
qDebug("QW WULOCK%p, %s:%d", (_lock), __FILE__, __LINE__); \ qDebug("QW WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
assert(atomic_load_32((_lock)) >= 0); \
} \ } \
} while (0) } while (0)
static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch, int32_t nOpt);
int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch);
int32_t qwAcquireAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch);
int32_t qwAcquireTask(SQWorkerMgmt *mgmt, int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, SQWTaskStatus **task);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
#include "tname.h" #include "tname.h"
#include "dataSinkMgt.h" #include "dataSinkMgt.h"
int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) { int32_t qwValidateStatus(SQWorkerMgmt *mgmt, int8_t oriStatus, int8_t newStatus, uint64_t sId, uint64_t qId, uint64_t tId) {
int32_t code = 0; int32_t code = 0;
if (oriStatus == newStatus) { if (oriStatus == newStatus) {
...@@ -62,7 +62,7 @@ int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) { ...@@ -62,7 +62,7 @@ int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) {
break; break;
default: default:
qError("invalid task status:%d", oriStatus); QW_TASK_ELOG("invalid task status:%d", oriStatus);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
...@@ -70,22 +70,27 @@ int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) { ...@@ -70,22 +70,27 @@ int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) {
_return: _return:
qError("invalid task status, from %d to %d", oriStatus, newStatus); QW_TASK_ELOG("invalid task status update from %d to %d", oriStatus, newStatus);
QW_ERR_RET(code); QW_RET(code);
} }
int32_t qwUpdateTaskInfo(SQWTaskStatus *task, int8_t type, void *data) { int32_t qwUpdateTaskInfo(SQWorkerMgmt *mgmt, SQWTaskStatus *task, int8_t type, void *data, uint64_t sId, uint64_t qId, uint64_t tId) {
int32_t code = 0; int32_t code = 0;
int8_t origStatus = 0;
switch (type) { switch (type) {
case QW_TASK_INFO_STATUS: { case QW_TASK_INFO_STATUS: {
int8_t newStatus = *(int8_t *)data; int8_t newStatus = *(int8_t *)data;
QW_ERR_RET(qwValidateStatus(task->status, newStatus)); QW_ERR_RET(qwValidateStatus(mgmt, task->status, newStatus, QW_IDS()));
origStatus = task->status;
task->status = newStatus; task->status = newStatus;
QW_TASK_DLOG("task status updated from %d to %d", origStatus, newStatus);
break; break;
} }
default: default:
qError("uknown task info type:%d", type); QW_TASK_ELOG("unknown task info, type:%d", type);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
...@@ -96,27 +101,27 @@ int32_t qwAddTaskHandlesToCache(SQWorkerMgmt *mgmt, uint64_t qId, uint64_t tId, ...@@ -96,27 +101,27 @@ int32_t qwAddTaskHandlesToCache(SQWorkerMgmt *mgmt, uint64_t qId, uint64_t tId,
char id[sizeof(qId) + sizeof(tId)] = {0}; char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId); QW_SET_QTID(id, qId, tId);
SQWorkerTaskHandlesCache resCache = {0}; SQWTaskCtx resCache = {0};
resCache.taskHandle = taskHandle; resCache.taskHandle = taskHandle;
resCache.sinkHandle = sinkHandle; resCache.sinkHandle = sinkHandle;
QW_LOCK(QW_WRITE, &mgmt->resLock); QW_LOCK(QW_WRITE, &mgmt->ctxLock);
if (0 != taosHashPut(mgmt->resHash, id, sizeof(id), &resCache, sizeof(SQWorkerTaskHandlesCache))) { if (0 != taosHashPut(mgmt->ctxHash, id, sizeof(id), &resCache, sizeof(SQWTaskCtx))) {
QW_UNLOCK(QW_WRITE, &mgmt->resLock); QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to resHash failed", qId, tId); QW_TASK_ELOG("taosHashPut task ctx to ctxHash failed, taskHandle:%p, sinkHandle:%p", taskHandle, sinkHandle);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
QW_UNLOCK(QW_WRITE, &mgmt->resLock); QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t qwAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) { int32_t qwAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) {
SQWSchStatus newSch = {0}; SQWSchStatus newSch = {0};
newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == newSch.tasksHash) { if (NULL == newSch.tasksHash) {
qError("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum); QW_SCH_DLOG("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum);
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
...@@ -126,14 +131,18 @@ static int32_t qwAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, ...@@ -126,14 +131,18 @@ static int32_t qwAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId,
if (0 != code) { if (0 != code) {
if (!HASH_NODE_EXIST(code)) { if (!HASH_NODE_EXIST(code)) {
QW_UNLOCK(QW_WRITE, &mgmt->schLock); QW_UNLOCK(QW_WRITE, &mgmt->schLock);
qError("taosHashPut sId[%"PRIx64"] to scheduleHash failed", sId); QW_SCH_ELOG("taosHashPut new sch to scheduleHash failed, errno:%d", errno);
taosHashCleanup(newSch.tasksHash); taosHashCleanup(newSch.tasksHash);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
} }
QW_UNLOCK(QW_WRITE, &mgmt->schLock); QW_UNLOCK(QW_WRITE, &mgmt->schLock);
if (TSDB_CODE_SUCCESS == qwAcquireScheduler(rwType, mgmt, sId, sch, QW_NOT_EXIST_ADD)) { if (TSDB_CODE_SUCCESS == qwAcquireScheduler(rwType, mgmt, sId, sch)) {
if (code) {
taosHashCleanup(newSch.tasksHash);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
...@@ -141,7 +150,7 @@ static int32_t qwAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, ...@@ -141,7 +150,7 @@ static int32_t qwAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch, int32_t nOpt) { int32_t qwAcquireSchedulerImpl(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch, int32_t nOpt) {
QW_LOCK(rwType, &mgmt->schLock); QW_LOCK(rwType, &mgmt->schLock);
*sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId)); *sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId));
if (NULL == (*sch)) { if (NULL == (*sch)) {
...@@ -159,113 +168,119 @@ static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t s ...@@ -159,113 +168,119 @@ static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t s
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) { int32_t qwAcquireAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) {
return qwAcquireSchedulerImpl(rwType, mgmt, sId, sch, QW_NOT_EXIST_ADD);
}
int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) {
return qwAcquireSchedulerImpl(rwType, mgmt, sId, sch, QW_NOT_EXIST_RET_ERR);
}
void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) {
QW_UNLOCK(rwType, &mgmt->schLock); QW_UNLOCK(rwType, &mgmt->schLock);
} }
static int32_t qwAcquireTaskImpl(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, SQWTaskStatus **task) { int32_t qwAddTaskImpl(SQWorkerMgmt *mgmt, SQWSchStatus *sch, int32_t rwType, uint64_t qId, uint64_t tId, int32_t status, int32_t eOpt, SQWTaskStatus **task) {
int32_t code = 0;
char id[sizeof(qId) + sizeof(tId)] = {0}; char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId); QW_SET_QTID(id, qId, tId);
QW_LOCK(rwType, &sch->tasksLock); SQWTaskStatus ntask = {0};
*task = taosHashGet(sch->tasksHash, id, sizeof(id)); ntask.status = status;
if (NULL == (*task)) {
QW_UNLOCK(rwType, &sch->tasksLock); QW_LOCK(QW_WRITE, &sch->tasksLock);
code = taosHashPut(sch->tasksHash, id, sizeof(id), &ntask, sizeof(ntask));
if (0 != code) {
QW_UNLOCK(QW_WRITE, &sch->tasksLock);
if (HASH_NODE_EXIST(code)) {
if (QW_EXIST_ACQUIRE == eOpt && rwType && task) {
QW_ERR_RET(qwAcquireTask(mgmt, rwType, sch, qId, tId, task));
} else if (QW_EXIST_RET_ERR == eOpt) {
return TSDB_CODE_QRY_TASK_ALREADY_EXIST;
} else {
assert(0);
}
} else {
qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", qId, tId);
return TSDB_CODE_QRY_APP_ERROR;
}
}
QW_UNLOCK(QW_WRITE, &sch->tasksLock);
return TSDB_CODE_QRY_TASK_NOT_EXIST; if (QW_EXIST_ACQUIRE == eOpt && rwType && task) {
QW_ERR_RET(qwAcquireTask(mgmt, rwType, sch, qId, tId, task));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t qwAcquireTask(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, SQWTaskStatus **task) { int32_t qwAddTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t status) {
return qwAcquireTaskImpl(rwType, sch, qId, tId, task); SQWSchStatus *tsch = NULL;
} int32_t code = 0;
QW_ERR_RET(qwAcquireAddScheduler(QW_READ, mgmt, sId, &tsch));
static FORCE_INLINE void qwReleaseTask(int32_t rwType, SQWSchStatus *sch) { QW_ERR_JRET(qwAddTaskImpl(mgmt, tsch, 0, qId, tId, status, QW_EXIST_RET_ERR, NULL));
QW_UNLOCK(rwType, &sch->tasksLock);
_return:
qwReleaseScheduler(QW_READ, mgmt);
QW_ERR_RET(code);
} }
int32_t qwAddTaskToSch(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, int8_t status, int32_t eOpt, SQWTaskStatus **task) {
int32_t code = 0;
int32_t qwAcquireTaskImpl(SQWorkerMgmt *mgmt, int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, int32_t status, int32_t nOpt, SQWTaskStatus **task) {
char id[sizeof(qId) + sizeof(tId)] = {0}; char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId); QW_SET_QTID(id, qId, tId);
SQWTaskStatus ntask = {0}; QW_LOCK(rwType, &sch->tasksLock);
ntask.status = status; *task = taosHashGet(sch->tasksHash, id, sizeof(id));
if (NULL == (*task)) {
while (true) { QW_UNLOCK(rwType, &sch->tasksLock);
QW_LOCK(QW_WRITE, &sch->tasksLock);
int32_t code = taosHashPut(sch->tasksHash, id, sizeof(id), &ntask, sizeof(ntask));
if (0 != code) {
QW_UNLOCK(QW_WRITE, &sch->tasksLock);
if (HASH_NODE_EXIST(code)) {
if (QW_EXIST_ACQUIRE == eOpt && rwType && task) {
if (qwAcquireTask(rwType, sch, qId, tId, task)) {
continue;
}
} else if (QW_EXIST_RET_ERR == eOpt) {
return TSDB_CODE_QRY_TASK_ALREADY_EXIST;
} else {
assert(0);
}
break;
} else {
qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", qId, tId);
return TSDB_CODE_QRY_APP_ERROR;
}
}
QW_UNLOCK(QW_WRITE, &sch->tasksLock);
if (rwType && task) { if (QW_NOT_EXIST_ADD == nOpt) {
if (TSDB_CODE_SUCCESS == qwAcquireTask(rwType, sch, qId, tId, task)) { QW_ERR_RET(qwAddTaskImpl(mgmt, sch, rwType, qId, tId, status, QW_EXIST_ACQUIRE, task));
return TSDB_CODE_SUCCESS; } else if (QW_NOT_EXIST_RET_ERR == nOpt) {
} return TSDB_CODE_QRY_TASK_NOT_EXIST;
} else { } else {
break; assert(0);
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t qwAddTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t status, int32_t eOpt, SQWSchStatus **sch, SQWTaskStatus **task) { int32_t qwAcquireTask(SQWorkerMgmt *mgmt, int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, SQWTaskStatus **task) {
SQWSchStatus *tsch = NULL; return qwAcquireTaskImpl(mgmt, rwType, sch, qId, tId, 0, QW_NOT_EXIST_RET_ERR, task);
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &tsch, QW_NOT_EXIST_ADD)); }
int32_t code = qwAddTaskToSch(QW_READ, tsch, qId, tId, status, eOpt, task); int32_t qwAcquireAddTask(SQWorkerMgmt *mgmt, int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, int32_t status, SQWTaskStatus **task) {
if (code) { return qwAcquireTaskImpl(mgmt, rwType, sch, qId, tId, status, QW_NOT_EXIST_ADD, task);
qwReleaseScheduler(QW_WRITE, mgmt); }
}
if (NULL == task) {
qwReleaseScheduler(QW_READ, mgmt);
} else if (sch) {
*sch = tsch;
}
QW_RET(code); void qwReleaseTask(int32_t rwType, SQWSchStatus *sch) {
QW_UNLOCK(rwType, &sch->tasksLock);
} }
static FORCE_INLINE int32_t qwAcquireTaskHandles(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, SQWorkerTaskHandlesCache **handles) {
int32_t qwAcquireTaskCtx(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, SQWTaskCtx **handles) {
char id[sizeof(queryId) + sizeof(taskId)] = {0}; char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId); QW_SET_QTID(id, queryId, taskId);
QW_LOCK(rwType, &mgmt->resLock); QW_LOCK(rwType, &mgmt->ctxLock);
*handles = taosHashGet(mgmt->resHash, id, sizeof(id)); *handles = taosHashGet(mgmt->ctxHash, id, sizeof(id));
if (NULL == (*handles)) { if (NULL == (*handles)) {
QW_UNLOCK(rwType, &mgmt->resLock); QW_UNLOCK(rwType, &mgmt->ctxLock);
return TSDB_CODE_QRY_RES_CACHE_NOT_EXIST; return TSDB_CODE_QRY_RES_CACHE_NOT_EXIST;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE void qwReleaseTaskResCache(int32_t rwType, SQWorkerMgmt *mgmt) { void qwReleaseTaskResCache(int32_t rwType, SQWorkerMgmt *mgmt) {
QW_UNLOCK(rwType, &mgmt->resLock); QW_UNLOCK(rwType, &mgmt->ctxLock);
} }
...@@ -273,7 +288,7 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRs ...@@ -273,7 +288,7 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRs
SQWSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
int32_t taskNum = 0; int32_t taskNum = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
sch->lastAccessTs = taosGetTimestampSec(); sch->lastAccessTs = taosGetTimestampSec();
...@@ -319,7 +334,7 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRs ...@@ -319,7 +334,7 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRs
int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t sId) { int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t sId) {
SQWSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
sch->lastAccessTs = taosGetTimestampSec(); sch->lastAccessTs = taosGetTimestampSec();
...@@ -333,12 +348,12 @@ int32_t qwUpdateTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6 ...@@ -333,12 +348,12 @@ int32_t qwUpdateTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6
SQWTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, qId, tId, &task)); QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task));
QW_LOCK(QW_WRITE, &task->lock); QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status); qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS());
QW_UNLOCK(QW_WRITE, &task->lock); QW_UNLOCK(QW_WRITE, &task->lock);
_return: _return:
...@@ -355,12 +370,12 @@ int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint ...@@ -355,12 +370,12 @@ int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint
SQWTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)) { if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch)) {
*taskStatus = JOB_TASK_STATUS_NULL; *taskStatus = JOB_TASK_STATUS_NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) { if (qwAcquireTask(mgmt, QW_READ, sch, queryId, taskId, &task)) {
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
*taskStatus = JOB_TASK_STATUS_NULL; *taskStatus = JOB_TASK_STATUS_NULL;
...@@ -376,22 +391,15 @@ int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint ...@@ -376,22 +391,15 @@ int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint
} }
int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId) { int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
SQWSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_ADD)); QW_ERR_RET(qwAcquireAddScheduler(QW_READ, mgmt, sId, &sch));
QW_ERR_JRET(qwAcquireAddTask(mgmt, QW_READ, sch, qId, tId, JOB_TASK_STATUS_NOT_START, &task));
if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) {
qwReleaseScheduler(QW_READ, mgmt);
code = qwAddTask(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_NOT_START, QW_EXIST_ACQUIRE, &sch, &task);
if (code) {
qwReleaseScheduler(QW_READ, mgmt);
QW_ERR_RET(code);
}
}
QW_LOCK(QW_WRITE, &task->lock); QW_LOCK(QW_WRITE, &task->lock);
...@@ -409,10 +417,10 @@ int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_ ...@@ -409,10 +417,10 @@ int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (task->status == JOB_TASK_STATUS_FAILED || task->status == JOB_TASK_STATUS_SUCCEED || task->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { } else if (task->status == JOB_TASK_STATUS_FAILED || task->status == JOB_TASK_STATUS_SUCCEED || task->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
newStatus = JOB_TASK_STATUS_CANCELLED; newStatus = JOB_TASK_STATUS_CANCELLED;
QW_ERR_JRET(qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus)); QW_ERR_JRET(qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &newStatus, QW_IDS()));
} else { } else {
newStatus = JOB_TASK_STATUS_CANCELLING; newStatus = JOB_TASK_STATUS_CANCELLING;
QW_ERR_JRET(qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus)); QW_ERR_JRET(qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &newStatus, QW_IDS()));
} }
QW_UNLOCK(QW_WRITE, &task->lock); QW_UNLOCK(QW_WRITE, &task->lock);
...@@ -441,55 +449,87 @@ _return: ...@@ -441,55 +449,87 @@ _return:
QW_RET(code); QW_RET(code);
} }
int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId) {
// caller should make sure task is not running
int32_t qwDropTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
QW_LOCK(QW_WRITE, &mgmt->ctxLock);
SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
if (NULL == ctx) {
QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
return TSDB_CODE_QRY_RES_CACHE_NOT_EXIST;
}
if (ctx->taskHandle) {
qDestroyTask(ctx->taskHandle);
ctx->taskHandle = NULL;
}
if (ctx->sinkHandle) {
dsDestroyDataSinker(ctx->sinkHandle);
ctx->sinkHandle = NULL;
}
if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) {
QW_TASK_ELOG("taosHashRemove from ctx hash failed, id:%s", id);
QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
return TSDB_CODE_QRY_RES_CACHE_NOT_EXIST;
}
QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
return TSDB_CODE_SUCCESS;
}
int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
SQWSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId); char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
QW_LOCK(QW_WRITE, &mgmt->resLock); qwDropTaskCtx(mgmt, sId, qId, tId);
if (mgmt->resHash) {
taosHashRemove(mgmt->resHash, id, sizeof(id));
}
QW_UNLOCK(QW_WRITE, &mgmt->resLock);
if (TSDB_CODE_SUCCESS != qwAcquireScheduler(QW_WRITE, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)) { if (qwAcquireScheduler(QW_WRITE, mgmt, sId, &sch)) {
qWarn("scheduler %"PRIx64" doesn't exist", sId); QW_TASK_WLOG("scheduler does not exist, sch:%p", sch);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (qwAcquireTask(QW_WRITE, sch, queryId, taskId, &task)) { if (qwAcquireTask(mgmt, QW_WRITE, sch, qId, tId, &task)) {
qwReleaseScheduler(QW_WRITE, mgmt); qwReleaseScheduler(QW_WRITE, mgmt);
qWarn("scheduler %"PRIx64" queryId %"PRIx64" taskId:%"PRIx64" doesn't exist", sId, queryId, taskId); QW_TASK_WLOG("task does not exist, task:%p", task);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
taosHashRemove(sch->tasksHash, id, sizeof(id)); QW_TASK_DLOG("drop task, status:%d, code:%x, ready:%d, cancel:%d, drop:%d", task->status, task->code, task->ready, task->cancel, task->drop);
if (taosHashRemove(sch->tasksHash, id, sizeof(id))) {
QW_TASK_ELOG("taosHashRemove task from hash failed, task:%p", task);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
_return:
qwReleaseTask(QW_WRITE, sch); qwReleaseTask(QW_WRITE, sch);
qwReleaseScheduler(QW_WRITE, mgmt); qwReleaseScheduler(QW_WRITE, mgmt);
return TSDB_CODE_SUCCESS; QW_RET(code);
} }
int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId) { int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
SQWSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_ADD)); QW_ERR_RET(qwAcquireAddScheduler(QW_READ, mgmt, sId, &sch));
if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) { QW_ERR_JRET(qwAcquireAddTask(mgmt, QW_READ, sch, qId, tId, JOB_TASK_STATUS_NOT_START, &task));
qwReleaseScheduler(QW_READ, mgmt);
code = qwAddTask(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_NOT_START, QW_EXIST_ACQUIRE, &sch, &task);
if (code) {
qwReleaseScheduler(QW_READ, mgmt);
QW_ERR_RET(code);
}
}
QW_LOCK(QW_WRITE, &task->lock); QW_LOCK(QW_WRITE, &task->lock);
...@@ -500,7 +540,7 @@ int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uin ...@@ -500,7 +540,7 @@ int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uin
if (task->status == JOB_TASK_STATUS_EXECUTING) { if (task->status == JOB_TASK_STATUS_EXECUTING) {
newStatus = JOB_TASK_STATUS_DROPPING; newStatus = JOB_TASK_STATUS_DROPPING;
QW_ERR_JRET(qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus)); QW_ERR_JRET(qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &newStatus, QW_IDS()));
} else if (task->status == JOB_TASK_STATUS_CANCELLING || task->status == JOB_TASK_STATUS_DROPPING || task->status == JOB_TASK_STATUS_NOT_START) { } else if (task->status == JOB_TASK_STATUS_CANCELLING || task->status == JOB_TASK_STATUS_DROPPING || task->status == JOB_TASK_STATUS_NOT_START) {
QW_UNLOCK(QW_WRITE, &task->lock); QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
...@@ -512,11 +552,12 @@ int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uin ...@@ -512,11 +552,12 @@ int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uin
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
QW_ERR_RET(qwDropTask(mgmt, sId, queryId, taskId)); QW_ERR_RET(qwDropTask(mgmt, sId, qId, tId));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
QW_UNLOCK(QW_WRITE, &task->lock); QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
...@@ -738,30 +779,35 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe ...@@ -738,30 +779,35 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg, int32_t rspCode) { int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SRpcMsg *pMsg) {
SQWSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task));
QW_LOCK(QW_WRITE, &task->lock); QW_LOCK(QW_WRITE, &task->lock);
if (QW_READY_NOT_RECEIVED == task->ready) { if (QW_READY_NOT_RECEIVED == task->ready) {
QW_SCH_TASK_DLOG("ready not received, ready:%d", task->ready);
goto _return;
} else if (QW_READY_RECEIVED == task->ready) {
task->ready = QW_READY_RESPONSED;
int32_t rspCode = task->code;
QW_UNLOCK(QW_WRITE, &task->lock); QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS; QW_ERR_RET(qwBuildAndSendReadyRsp(pMsg, rspCode));
} else if (QW_READY_RECEIVED == task->ready) {
QW_ERR_JRET(qwBuildAndSendReadyRsp(pMsg, rspCode)); QW_SCH_TASK_DLOG("ready response sent, ready:%d", task->ready);
task->ready = QW_READY_RESPONSED; return TSDB_CODE_SUCCESS;
} else if (QW_READY_RESPONSED == task->ready) { } else if (QW_READY_RESPONSED == task->ready) {
qError("query response already send"); QW_SCH_TASK_ELOG("ready response already send, ready:%d", task->ready);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} else { } else {
assert(0); assert(0);
...@@ -772,7 +818,6 @@ _return: ...@@ -772,7 +818,6 @@ _return:
if (task) { if (task) {
QW_UNLOCK(QW_WRITE, &task->lock); QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
} }
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
...@@ -780,34 +825,39 @@ _return: ...@@ -780,34 +825,39 @@ _return:
QW_RET(code); QW_RET(code);
} }
int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SRpcMsg *pMsg) {
SQWSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task));
QW_LOCK(QW_WRITE, &task->lock); QW_LOCK(QW_WRITE, &task->lock);
if (QW_TASK_READY_RESP(task->status)) {
QW_ERR_JRET(qwBuildAndSendReadyRsp(pMsg, task->code));
int8_t status = task->status;
int32_t errCode = task->code;
if (QW_TASK_READY(status)) {
task->ready = QW_READY_RESPONSED; task->ready = QW_READY_RESPONSED;
QW_UNLOCK(QW_WRITE, &task->lock);
QW_ERR_JRET(qwBuildAndSendReadyRsp(pMsg, errCode));
QW_SCH_TASK_DLOG("task ready responsed, status:%d", status);
} else { } else {
task->ready = QW_READY_RECEIVED; task->ready = QW_READY_RECEIVED;
QW_UNLOCK(QW_WRITE, &task->lock); QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch); QW_SCH_TASK_DLOG("task ready NOT responsed, status:%d", status);
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS;
} }
_return: _return:
if (task) { if (task) {
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
} }
...@@ -816,7 +866,7 @@ _return: ...@@ -816,7 +866,7 @@ _return:
QW_RET(code); QW_RET(code);
} }
int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, bool *needStop) { int32_t qwCheckAndProcessTaskDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, bool *needStop) {
SQWSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
...@@ -824,44 +874,39 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryI ...@@ -824,44 +874,39 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryI
*needStop = false; *needStop = false;
if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)) { if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) { if (qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task)) {
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
QW_LOCK(QW_READ, &task->lock);
if ((!task->cancel) && (!task->drop)) { if ((!atomic_load_8(&task->cancel)) && (!atomic_load_8(&task->drop))) {
qError("no cancel or drop, but task:%"PRIx64" exists", taskId); QW_TASK_ELOG("no cancel or drop but task exists, status:%d", atomic_load_8(&task->status));
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
QW_UNLOCK(QW_READ, &task->lock);
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
QW_RET(TSDB_CODE_QRY_APP_ERROR);
} }
QW_UNLOCK(QW_READ, &task->lock);
*needStop = true; *needStop = true;
if (task->cancel) { if (atomic_load_8(&task->cancel)) {
QW_LOCK(QW_WRITE, &task->lock); QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status); code = qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS());
QW_UNLOCK(QW_WRITE, &task->lock); QW_UNLOCK(QW_WRITE, &task->lock);
QW_ERR_JRET(code);
} }
if (task->drop) { if (task->drop) {
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
return qwDropTask(mgmt, sId, queryId, taskId); QW_RET(qwDropTask(mgmt, sId, qId, tId));
} }
_return:
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
...@@ -875,31 +920,27 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6 ...@@ -875,31 +920,27 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6
int32_t code = 0; int32_t code = 0;
int8_t newStatus = JOB_TASK_STATUS_CANCELLED; int8_t newStatus = JOB_TASK_STATUS_CANCELLED;
code = qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_ADD); code = qwAcquireAddScheduler(QW_READ, mgmt, sId, &sch);
if (code) { if (code) {
qError("sId:%"PRIx64" not in cache", sId); QW_TASK_ELOG("sId:%"PRIx64" not in cache", sId);
QW_ERR_RET(code); QW_ERR_RET(code);
} }
code = qwAcquireTask(QW_READ, sch, qId, tId, &task); code = qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task);
if (code) { if (code) {
qwReleaseScheduler(QW_READ, mgmt); QW_TASK_ELOG("sId:%"PRIx64" queryId:%"PRIx64" taskId:%"PRIx64" not in cache", sId, qId, tId);
QW_ERR_RET(code);
if (JOB_TASK_STATUS_PARTIAL_SUCCEED == status || JOB_TASK_STATUS_SUCCEED == status) {
qError("sId:%"PRIx64" queryId:%"PRIx64" taskId:%"PRIx64" not in cache", sId, qId, tId);
QW_ERR_RET(code);
}
QW_ERR_RET(qwAddTask(mgmt, sId, qId, tId, status, QW_EXIST_ACQUIRE, &sch, &task));
} }
QW_LOCK(QW_WRITE, &task->lock);
if (task->cancel) { if (task->cancel) {
QW_LOCK(QW_WRITE, &task->lock); qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &newStatus, QW_IDS());
qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus);
QW_UNLOCK(QW_WRITE, &task->lock);
} }
if (task->drop) { if (task->drop) {
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
...@@ -909,11 +950,11 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6 ...@@ -909,11 +950,11 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6
} }
if (!(task->cancel || task->drop)) { if (!(task->cancel || task->drop)) {
QW_LOCK(QW_WRITE, &task->lock); qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS());
qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status);
task->code = errCode; task->code = errCode;
QW_UNLOCK(QW_WRITE, &task->lock);
} }
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
...@@ -921,8 +962,89 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6 ...@@ -921,8 +962,89 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwScheduleDataSink(SQWTaskCtx *handles, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) {
if (atomic_load_8(&handles->sinkScheduled)) {
qDebug("data sink already scheduled");
return TSDB_CODE_SUCCESS;
}
SSinkDataReq * req = (SSinkDataReq *)rpcMallocCont(sizeof(SSinkDataReq));
if (NULL == req) {
qError("rpcMallocCont %d failed", (int32_t)sizeof(SSinkDataReq));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
req->header.vgId = mgmt->nodeId;
req->sId = sId;
req->queryId = queryId;
req->taskId = taskId;
SRpcMsg pNewMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.msgType = TDMT_VND_SCHEDULE_DATA_SINK,
.pCont = req,
.contLen = sizeof(SSinkDataReq),
.code = 0,
};
int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg);
if (TSDB_CODE_SUCCESS != code) {
qError("put data sink schedule msg to queue failed, code:%x", code);
rpcFreeCont(req);
QW_ERR_RET(code);
}
qDebug("put data sink schedule msg to query queue");
int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS;
}
int32_t qwScheduleQuery(SQWTaskCtx *handles, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SRpcMsg *pMsg) {
if (atomic_load_8(&handles->queryScheduled)) {
QW_SCH_TASK_ELOG("query already scheduled, queryScheduled:%d", handles->queryScheduled);
return TSDB_CODE_SUCCESS;
}
QW_ERR_RET(qwUpdateTaskStatus(mgmt, sId, qId, tId, JOB_TASK_STATUS_EXECUTING));
SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
if (NULL == req) {
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
req->header.vgId = mgmt->nodeId;
req->sId = sId;
req->queryId = qId;
req->taskId = tId;
SRpcMsg pNewMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.msgType = TDMT_VND_QUERY_CONTINUE,
.pCont = req,
.contLen = sizeof(SQueryContinueReq),
.code = 0,
};
int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg);
if (TSDB_CODE_SUCCESS != code) {
QW_SCH_TASK_ELOG("put query continue msg to queue failed, code:%x", code);
rpcFreeCont(req);
QW_ERR_RET(code);
}
handles->queryScheduled = true;
QW_SCH_TASK_DLOG("put query continue msg to query queue, vgId:%d", mgmt->nodeId);
return TSDB_CODE_SUCCESS;
}
int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SRpcMsg *pMsg) {
SQWSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
...@@ -932,22 +1054,30 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64 ...@@ -932,22 +1054,30 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64
int32_t dataLength = 0; int32_t dataLength = 0;
SRetrieveTableRsp *rsp = NULL; SRetrieveTableRsp *rsp = NULL;
bool queryEnd = false; bool queryEnd = false;
SQWorkerTaskHandlesCache *handles = NULL; SQWTaskCtx *handles = NULL;
int8_t status = 0;
QW_ERR_JRET(qwAcquireTaskHandles(QW_READ, mgmt, queryId, taskId, &handles)); QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, mgmt, qId, tId, &handles));
QW_LOCK(QW_WRITE, &handles->lock);
if (handles->needRsp) {
QW_UNLOCK(QW_WRITE, &handles->lock);
QW_SCH_TASK_ELOG("last fetch not responsed, needRsp:%d", handles->needRsp);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)); QW_UNLOCK(QW_WRITE, &handles->lock);
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task));
QW_LOCK(QW_READ, &task->lock); QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task));
if (task->cancel || task->drop) { if (task->cancel || task->drop) {
qError("task is already cancelled or dropped"); QW_SCH_TASK_ELOG("task is already cancelled or dropped, cancel:%d, drop:%d", task->cancel, task->drop);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
if (task->status != JOB_TASK_STATUS_EXECUTING && task->status != JOB_TASK_STATUS_PARTIAL_SUCCEED) { if (task->status != JOB_TASK_STATUS_EXECUTING && task->status != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
qError("invalid status %d for fetch", task->status); QW_SCH_TASK_ELOG("invalid status %d for fetch", task->status);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
...@@ -955,6 +1085,9 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64 ...@@ -955,6 +1085,9 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64
if (dataLength > 0) { if (dataLength > 0) {
SOutputData output = {0}; SOutputData output = {0};
QW_SCH_TASK_DLOG("task got data in sink, dataLength:%d", dataLength);
QW_ERR_JRET(qwInitFetchRsp(dataLength, &rsp)); QW_ERR_JRET(qwInitFetchRsp(dataLength, &rsp));
output.pData = rsp->data; output.pData = rsp->data;
...@@ -974,29 +1107,39 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64 ...@@ -974,29 +1107,39 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64
if (DS_BUF_EMPTY == output.bufStatus && output.queryEnd) { if (DS_BUF_EMPTY == output.bufStatus && output.queryEnd) {
rsp->completed = 1; rsp->completed = 1;
status = JOB_TASK_STATUS_SUCCEED;
QW_SCH_TASK_DLOG("task all fetched, status:%d", status);
QW_ERR_JRET(qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS()));
} }
// Note: schedule data sink firstly and will schedule query after it's done
if (output.needSchedule) { if (output.needSchedule) {
//TODO QW_SCH_TASK_DLOG("sink need schedule, queryEnd:%d", output.queryEnd);
} QW_ERR_JRET(qwScheduleDataSink(handles, mgmt, sId, qId, tId, pMsg));
} else if ((!output.queryEnd) && (DS_BUF_LOW == output.bufStatus || DS_BUF_EMPTY == output.bufStatus)) {
if ((!output.queryEnd) && DS_BUF_LOW == output.bufStatus) { QW_SCH_TASK_DLOG("task not end, need to continue, bufStatus:%d", output.bufStatus);
//TODO QW_ERR_JRET(qwScheduleQuery(handles, mgmt, sId, qId, tId, pMsg));
//UPDATE STATUS TO EXECUTING
} }
} else { } else {
if (dataLength < 0) { if (dataLength < 0) {
qError("invalid length from dsGetDataLength, length:%d", dataLength); QW_SCH_TASK_ELOG("invalid length from dsGetDataLength, length:%d", dataLength);
QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
} }
if (queryEnd) { if (queryEnd) {
QW_ERR_JRET(qwQueryPostProcess(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_SUCCEED, code)); status = JOB_TASK_STATUS_SUCCEED;
QW_SCH_TASK_DLOG("no data in sink and query end, dataLength:%d", dataLength);
QW_ERR_JRET(qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS()));
} else { } else {
if (task->status != JOB_TASK_STATUS_EXECUTING) { assert(0 == handles->needRsp);
qError("invalid status %d for fetch without res", task->status);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); // MUST IN SCHEDULE OR IN SINK SCHEDULE
}
QW_SCH_TASK_DLOG("no res data in sink, need response later, queryEnd:%d", queryEnd);
QW_LOCK(QW_WRITE, &handles->lock); QW_LOCK(QW_WRITE, &handles->lock);
handles->needRsp = true; handles->needRsp = true;
...@@ -1009,7 +1152,6 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64 ...@@ -1009,7 +1152,6 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64
_return: _return:
if (task) { if (task) {
QW_UNLOCK(QW_READ, &task->lock);
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
} }
...@@ -1028,7 +1170,12 @@ _return: ...@@ -1028,7 +1170,12 @@ _return:
QW_RET(code); QW_RET(code);
} }
int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) { int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, putReqToQueryQFp fp) {
if (NULL == qWorkerMgmt || NULL == nodeObj || NULL == fp) {
qError("invalid param to init qworker");
QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SQWorkerMgmt *mgmt = calloc(1, sizeof(SQWorkerMgmt)); SQWorkerMgmt *mgmt = calloc(1, sizeof(SQWorkerMgmt));
if (NULL == mgmt) { if (NULL == mgmt) {
qError("calloc %d failed", (int32_t)sizeof(SQWorkerMgmt)); qError("calloc %d failed", (int32_t)sizeof(SQWorkerMgmt));
...@@ -1037,29 +1184,46 @@ int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) { ...@@ -1037,29 +1184,46 @@ int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) {
if (cfg) { if (cfg) {
mgmt->cfg = *cfg; mgmt->cfg = *cfg;
if (0 == mgmt->cfg.maxSchedulerNum) {
mgmt->cfg.maxSchedulerNum = QWORKER_DEFAULT_SCHEDULER_NUMBER;
}
if (0 == mgmt->cfg.maxTaskNum) {
mgmt->cfg.maxTaskNum = QWORKER_DEFAULT_TASK_NUMBER;
}
if (0 == mgmt->cfg.maxSchTaskNum) {
mgmt->cfg.maxSchTaskNum = QWORKER_DEFAULT_SCH_TASK_NUMBER;
}
} else { } else {
mgmt->cfg.maxSchedulerNum = QWORKER_DEFAULT_SCHEDULER_NUMBER; mgmt->cfg.maxSchedulerNum = QWORKER_DEFAULT_SCHEDULER_NUMBER;
mgmt->cfg.maxResCacheNum = QWORKER_DEFAULT_RES_CACHE_NUMBER; mgmt->cfg.maxTaskNum = QWORKER_DEFAULT_TASK_NUMBER;
mgmt->cfg.maxSchTaskNum = QWORKER_DEFAULT_SCH_TASK_NUMBER; mgmt->cfg.maxSchTaskNum = QWORKER_DEFAULT_SCH_TASK_NUMBER;
} }
mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); mgmt->schHash = taosHashInit(mgmt->cfg.maxSchedulerNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
if (NULL == mgmt->schHash) { if (NULL == mgmt->schHash) {
tfree(mgmt); tfree(mgmt);
QW_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler hash failed", mgmt->cfg.maxSchedulerNum); qError("init %d scheduler hash failed", mgmt->cfg.maxSchedulerNum);
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
mgmt->resHash = taosHashInit(mgmt->cfg.maxResCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); mgmt->ctxHash = taosHashInit(mgmt->cfg.maxTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == mgmt->resHash) { if (NULL == mgmt->ctxHash) {
taosHashCleanup(mgmt->schHash); taosHashCleanup(mgmt->schHash);
mgmt->schHash = NULL; mgmt->schHash = NULL;
tfree(mgmt); tfree(mgmt);
qError("init %d task ctx hash failed", mgmt->cfg.maxTaskNum);
QW_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d res cache hash failed", mgmt->cfg.maxResCacheNum); QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
mgmt->nodeType = nodeType;
mgmt->nodeId = nodeId;
mgmt->nodeObj = nodeObj;
mgmt->putToQueueFp = fp;
*qWorkerMgmt = mgmt; *qWorkerMgmt = mgmt;
qDebug("qworker initialized for node, type:%d, id:%d, handle:%p", mgmt->nodeType, mgmt->nodeId, mgmt);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -1069,105 +1233,164 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -1069,105 +1233,164 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
} }
int32_t code = 0; int32_t code = 0;
bool queryRsped = false;
bool needStop = false;
struct SSubplan *plan = NULL;
SSubQueryMsg *msg = pMsg->pCont; SSubQueryMsg *msg = pMsg->pCont;
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
int32_t rspCode = 0;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
qError("invalid query msg"); QW_ELOG("invalid query msg, contLen:%d", pMsg->contLen);
QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
} }
msg->sId = htobe64(msg->sId); msg->sId = be64toh(msg->sId);
msg->queryId = htobe64(msg->queryId); msg->queryId = be64toh(msg->queryId);
msg->taskId = htobe64(msg->taskId); msg->taskId = be64toh(msg->taskId);
msg->contentLen = ntohl(msg->contentLen); msg->contentLen = ntohl(msg->contentLen);
bool queryRsped = false; uint64_t sId = msg->sId;
bool needStop = false; uint64_t qId = msg->queryId;
struct SSubplan *plan = NULL; uint64_t tId = msg->taskId;
QW_ERR_JRET(qwCheckTaskCancelDrop(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, &needStop)); QW_ERR_JRET(qwCheckAndProcessTaskDrop(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, &needStop));
if (needStop) { if (needStop) {
qWarn("task need stop"); QW_TASK_DLOG("task need stop, msgLen:%d", msg->contentLen);
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_QRY_TASK_CANCELLED);
QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED);
} }
QW_ERR_JRET(qwAddTask(qWorkerMgmt, sId, qId, tId, JOB_TASK_STATUS_EXECUTING));
code = qStringToSubplan(msg->msg, &plan); code = qStringToSubplan(msg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
qError("schId:%"PRIx64",qId:%"PRIx64",taskId:%"PRIx64" string to subplan failed, code:%d", msg->sId, msg->queryId, msg->taskId, code); QW_TASK_ELOG("string to subplan failed, code:%d", code);
QW_ERR_JRET(code); QW_ERR_JRET(code);
} }
qTaskInfo_t pTaskInfo = NULL; qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(node, 0, (struct SSubplan *)plan, &pTaskInfo); code = qCreateExecTask(node, 0, (struct SSubplan *)plan, &pTaskInfo);
if (code) { if (code) {
qError("qCreateExecTask failed, code:%x", code); QW_TASK_ELOG("qCreateExecTask failed, code:%x", code);
QW_ERR_JRET(code); QW_ERR_JRET(code);
} else {
QW_ERR_JRET(qwAddTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_EXECUTING, QW_EXIST_RET_ERR, NULL, NULL));
} }
QW_ERR_JRET(qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_SUCCESS)); QW_ERR_JRET(qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_SUCCESS));
queryRsped = true; queryRsped = true;
DataSinkHandle sinkHandle = NULL; DataSinkHandle sinkHandle = NULL;
code = qExecTask(pTaskInfo, &sinkHandle); code = qExecTask(pTaskInfo, &sinkHandle);
if (code) { if (code) {
qError("qExecTask failed, code:%x", code); QW_TASK_ELOG("qExecTask failed, code:%x", code);
QW_ERR_JRET(code); QW_ERR_JRET(code);
} else { }
QW_ERR_JRET(qwAddTaskHandlesToCache(qWorkerMgmt, msg->queryId, msg->taskId, pTaskInfo, sinkHandle));
QW_ERR_JRET(qwUpdateTaskStatus(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_PARTIAL_SUCCEED)); QW_ERR_JRET(qwAddTaskHandlesToCache(qWorkerMgmt, msg->queryId, msg->taskId, pTaskInfo, sinkHandle));
}
_return: _return:
if (queryRsped) { if (code) {
code = qwCheckAndSendReadyRsp(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg, code); rspCode = code;
} else { }
code = qwBuildAndSendQueryRsp(pMsg, code);
if (!queryRsped) {
code = qwBuildAndSendQueryRsp(pMsg, rspCode);
if (TSDB_CODE_SUCCESS == rspCode && code) {
rspCode = code;
}
} }
int8_t status = 0; int8_t status = 0;
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != rspCode) {
status = JOB_TASK_STATUS_FAILED; status = JOB_TASK_STATUS_FAILED;
} else { } else {
status = JOB_TASK_STATUS_PARTIAL_SUCCEED; status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
} }
qwQueryPostProcess(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, status, code); qwQueryPostProcess(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, status, rspCode);
if (queryRsped) {
qwCheckAndSendReadyRsp(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg);
}
QW_RET(code); QW_RET(rspCode);
} }
int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
int8_t status = 0; int8_t status = 0;
bool queryDone = false; bool queryDone = false;
uint64_t sId, qId, tId; SQueryContinueReq *req = (SQueryContinueReq *)pMsg->pCont;
bool needStop = false;
SQWTaskCtx *handles = NULL;
//TODO call executer to continue execute subquery QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, qWorkerMgmt, req->queryId, req->taskId, &handles));
code = 0; QW_LOCK(QW_WRITE, &handles->lock);
void *data = NULL;
queryDone = false; qTaskInfo_t taskHandle = handles->taskHandle;
//TODO call executer to continue execute subquery DataSinkHandle sinkHandle = handles->sinkHandle;
QW_UNLOCK(QW_WRITE, &handles->lock);
qwReleaseTaskResCache(QW_READ, qWorkerMgmt);
QW_ERR_JRET(qwCheckAndProcessTaskDrop(qWorkerMgmt, req->sId, req->queryId, req->taskId, &needStop));
if (needStop) {
qWarn("task need stop");
QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, qWorkerMgmt, req->queryId, req->taskId, &handles));
QW_LOCK(QW_WRITE, &handles->lock);
if (handles->needRsp) {
qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_QRY_TASK_CANCELLED);
handles->needRsp = false;
}
QW_UNLOCK(QW_WRITE, &handles->lock);
qwReleaseTaskResCache(QW_READ, qWorkerMgmt);
QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED);
}
DataSinkHandle newHandle = NULL;
code = qExecTask(taskHandle, &newHandle);
if (code) {
qError("qExecTask failed, code:%x", code);
QW_ERR_JRET(code);
}
if (sinkHandle != newHandle) {
qError("data sink mis-match");
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
_return:
QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, qWorkerMgmt, req->queryId, req->taskId, &handles));
QW_LOCK(QW_WRITE, &handles->lock);
if (handles->needRsp) {
code = qwBuildAndSendQueryRsp(pMsg, code);
handles->needRsp = false;
}
handles->queryScheduled = false;
QW_UNLOCK(QW_WRITE, &handles->lock);
qwReleaseTaskResCache(QW_READ, qWorkerMgmt);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
status = JOB_TASK_STATUS_FAILED; status = JOB_TASK_STATUS_FAILED;
} else if (queryDone) {
status = JOB_TASK_STATUS_SUCCEED;
} else { } else {
status = JOB_TASK_STATUS_PARTIAL_SUCCEED; status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
} }
code = qwQueryPostProcess(qWorkerMgmt, sId, qId, tId, status, code); code = qwQueryPostProcess(qWorkerMgmt, req->sId, req->queryId, req->taskId, status, code);
QW_RET(code); QW_RET(code);
} }
int32_t qWorkerProcessSinkDataMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ int32_t qWorkerProcessDataSinkMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
...@@ -1176,8 +1399,9 @@ int32_t qWorkerProcessSinkDataMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ ...@@ -1176,8 +1399,9 @@ int32_t qWorkerProcessSinkDataMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
if (NULL == msg || pMsg->contLen < sizeof(*msg)) { if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid sink data msg"); qError("invalid sink data msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
//dsScheduleProcess();
//TODO //TODO
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -42,6 +42,11 @@ int32_t qwtStringToPlan(const char* str, SSubplan** subplan) { ...@@ -42,6 +42,11 @@ int32_t qwtStringToPlan(const char* str, SSubplan** subplan) {
return 0; return 0;
} }
int32_t qwtPutReqToQueue(void *node, struct SRpcMsg *pMsg) {
return 0;
}
void qwtRpcSendResponse(const SRpcMsg *pRsp) { void qwtRpcSendResponse(const SRpcMsg *pRsp) {
if (TDMT_VND_TASKS_STATUS_RSP == pRsp->msgType) { if (TDMT_VND_TASKS_STATUS_RSP == pRsp->msgType) {
SSchedulerStatusRsp *rsp = (SSchedulerStatusRsp *)pRsp->pCont; SSchedulerStatusRsp *rsp = (SSchedulerStatusRsp *)pRsp->pCont;
...@@ -258,7 +263,7 @@ TEST(seqTest, normalCase) { ...@@ -258,7 +263,7 @@ TEST(seqTest, normalCase) {
stubSetStringToPlan(); stubSetStringToPlan();
stubSetRpcSendResponse(); stubSetRpcSendResponse();
code = qWorkerInit(NULL, &mgmt); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1); statusMsg.sId = htobe64(1);
...@@ -328,7 +333,7 @@ TEST(seqTest, cancelFirst) { ...@@ -328,7 +333,7 @@ TEST(seqTest, cancelFirst) {
stubSetStringToPlan(); stubSetStringToPlan();
stubSetRpcSendResponse(); stubSetRpcSendResponse();
code = qWorkerInit(NULL, &mgmt); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
statusMsg.sId = htobe64(1); statusMsg.sId = htobe64(1);
...@@ -402,7 +407,7 @@ TEST(seqTest, randCase) { ...@@ -402,7 +407,7 @@ TEST(seqTest, randCase) {
srand(time(NULL)); srand(time(NULL));
code = qWorkerInit(NULL, &mgmt); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
int32_t t = 0; int32_t t = 0;
...@@ -446,7 +451,7 @@ TEST(seqTest, multithreadRand) { ...@@ -446,7 +451,7 @@ TEST(seqTest, multithreadRand) {
srand(time(NULL)); srand(time(NULL));
code = qWorkerInit(NULL, &mgmt); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
pthread_attr_t thattr; pthread_attr_t thattr;
......
...@@ -36,11 +36,31 @@ enum { ...@@ -36,11 +36,31 @@ enum {
SCH_WRITE, SCH_WRITE,
}; };
typedef struct SSchApiStat {
} SSchApiStat;
typedef struct SSchRuntimeStat {
} SSchRuntimeStat;
typedef struct SSchJobStat {
} SSchJobStat;
typedef struct SSchedulerStat {
SSchApiStat api;
SSchRuntimeStat runtime;
SSchJobStat job;
} SSchedulerStat;
typedef struct SSchedulerMgmt { typedef struct SSchedulerMgmt {
uint64_t taskId; // sequential taksId uint64_t taskId; // sequential taksId
uint64_t sId; // schedulerId uint64_t sId; // schedulerId
SSchedulerCfg cfg; SSchedulerCfg cfg;
SHashObj *jobs; // key: queryId, value: SQueryJob* SHashObj *jobs; // key: queryId, value: SQueryJob*
SSchedulerStat stat;
} SSchedulerMgmt; } SSchedulerMgmt;
typedef struct SSchCallbackParam { typedef struct SSchCallbackParam {
......
...@@ -1460,34 +1460,37 @@ void scheduleFreeJob(void *job) { ...@@ -1460,34 +1460,37 @@ void scheduleFreeJob(void *job) {
} }
SSchJob *pJob = job; SSchJob *pJob = job;
uint64_t queryId = pJob->queryId;
if (0 != taosHashRemove(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId))) { if (SCH_GET_JOB_STATUS(pJob) > 0) {
SCH_JOB_ELOG("taosHashRemove job from list failed, may already freed, pJob:%p", pJob); if (0 != taosHashRemove(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId))) {
return; SCH_JOB_ELOG("taosHashRemove job from list failed, may already freed, pJob:%p", pJob);
} return;
}
schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_DROPPING); schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_DROPPING);
SCH_JOB_DLOG("job removed from list, no further ref, ref:%d", atomic_load_32(&pJob->ref)); SCH_JOB_DLOG("job removed from list, no further ref, ref:%d", atomic_load_32(&pJob->ref));
while (true) { while (true) {
int32_t ref = atomic_load_32(&pJob->ref); int32_t ref = atomic_load_32(&pJob->ref);
if (0 == ref) { if (0 == ref) {
break; break;
} else if (ref > 0) { } else if (ref > 0) {
usleep(1); usleep(1);
} else { } else {
assert(0); assert(0);
}
} }
}
SCH_JOB_DLOG("job no ref now, status:%d", SCH_GET_JOB_STATUS(pJob)); SCH_JOB_DLOG("job no ref now, status:%d", SCH_GET_JOB_STATUS(pJob));
if (pJob->status == JOB_TASK_STATUS_EXECUTING) { if (pJob->status == JOB_TASK_STATUS_EXECUTING) {
schCancelJob(pJob); schCancelJob(pJob);
} }
schDropJobAllTasks(pJob); schDropJobAllTasks(pJob);
}
pJob->subPlans = NULL; // it is a reference to pDag->pSubplans pJob->subPlans = NULL; // it is a reference to pDag->pSubplans
...@@ -1513,6 +1516,8 @@ void scheduleFreeJob(void *job) { ...@@ -1513,6 +1516,8 @@ void scheduleFreeJob(void *job) {
tfree(pJob->res); tfree(pJob->res);
tfree(pJob); tfree(pJob);
qDebug("QID:%"PRIx64" job freed", queryId);
} }
void schedulerDestroy(void) { void schedulerDestroy(void) {
......
...@@ -79,6 +79,7 @@ void schtBuildQueryDag(SQueryDag *dag) { ...@@ -79,6 +79,7 @@ void schtBuildQueryDag(SQueryDag *dag) {
scanPlan->level = 1; scanPlan->level = 1;
scanPlan->pParents = taosArrayInit(1, POINTER_BYTES); scanPlan->pParents = taosArrayInit(1, POINTER_BYTES);
scanPlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode)); scanPlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
scanPlan->msgType = TDMT_VND_QUERY;
mergePlan->id.queryId = qId; mergePlan->id.queryId = qId;
mergePlan->id.templateId = 0x4444444444; mergePlan->id.templateId = 0x4444444444;
...@@ -89,6 +90,7 @@ void schtBuildQueryDag(SQueryDag *dag) { ...@@ -89,6 +90,7 @@ void schtBuildQueryDag(SQueryDag *dag) {
mergePlan->pChildren = taosArrayInit(1, POINTER_BYTES); mergePlan->pChildren = taosArrayInit(1, POINTER_BYTES);
mergePlan->pParents = NULL; mergePlan->pParents = NULL;
mergePlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode)); mergePlan->pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode));
mergePlan->msgType = TDMT_VND_QUERY;
SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan); SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan);
SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan); SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan);
...@@ -163,6 +165,11 @@ void schtExecNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) { ...@@ -163,6 +165,11 @@ void schtExecNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
} }
void schtRpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {
}
void schtSetPlanToString() { void schtSetPlanToString() {
static Stub stub; static Stub stub;
...@@ -190,6 +197,20 @@ void schtSetExecNode() { ...@@ -190,6 +197,20 @@ void schtSetExecNode() {
} }
} }
void schtSetRpcSendRequest() {
static Stub stub;
stub.set(rpcSendRequest, schtRpcSendRequest);
{
AddrAny any("libtransport.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^rpcSendRequest$", result);
for (const auto& f : result) {
stub.set(f.second, schtRpcSendRequest);
}
}
}
void *schtSendRsp(void *param) { void *schtSendRsp(void *param) {
SSchJob *job = NULL; SSchJob *job = NULL;
int32_t code = 0; int32_t code = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册