提交 536e3aeb 编写于 作者: D dapan1121

feature/qnode

上级 0be19201
......@@ -1002,7 +1002,7 @@ typedef struct {
} SUpdateTagValRsp;
typedef struct SSubQueryMsg {
uint64_t schedulerId;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
uint32_t contentLen;
......@@ -1010,7 +1010,7 @@ typedef struct SSubQueryMsg {
} SSubQueryMsg;
typedef struct SResReadyMsg {
uint64_t schedulerId;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
} SResReadyMsg;
......@@ -1020,13 +1020,13 @@ typedef struct SResReadyRsp {
} SResReadyRsp;
typedef struct SResFetchMsg {
uint64_t schedulerId;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
} SResFetchMsg;
typedef struct SSchTasksStatusMsg {
uint64_t schedulerId;
uint64_t sId;
} SSchTasksStatusMsg;
typedef struct STaskStatus {
......@@ -1041,7 +1041,7 @@ typedef struct SSchedulerStatusRsp {
} SSchedulerStatusRsp;
typedef struct STaskCancelMsg {
uint64_t schedulerId;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
} STaskCancelMsg;
......@@ -1051,7 +1051,7 @@ typedef struct STaskCancelRsp {
} STaskCancelRsp;
typedef struct STaskDropMsg {
uint64_t schedulerId;
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
} STaskDropMsg;
......
......@@ -22,13 +22,27 @@ int vnodeQueryOpen(SVnode *pVnode) {
int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vInfo("query message is processed");
qWorkerProcessQueryMsg(pVnode, pVnode->pQuery, pMsg);
return 0;
return qWorkerProcessQueryMsg(pVnode, pVnode->pQuery, pMsg);
}
int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vInfo("fetch message is processed");
qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg);
switch (pMsg->msgType) {
case TDMT_VND_FETCH:
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg);
case TDMT_VND_RES_READY:
return qWorkerProcessReadyMsg(pVnode, pVnode->pQuery, pMsg);
case TDMT_VND_TASKS_STATUS:
return qWorkerProcessStatusMsg(pVnode, pVnode->pQuery, pMsg);
case TDMT_VND_CANCEL_TASK:
return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg);
case TDMT_VND_DROP_TASK:
return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg);
default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR;
break;
}
return 0;
}
......
......@@ -42,25 +42,41 @@ enum {
QW_WRITE,
};
typedef struct SQWorkerTaskStatus {
enum {
QW_EXIST_ACQUIRE = 1,
QW_EXIST_RET_ERR,
};
enum {
QW_NOT_EXIST_RET_ERR = 1,
QW_NOT_EXIST_ADD,
};
enum {
QW_ADD_RET_ERR = 1,
QW_ADD_ACQUIRE,
};
typedef struct SQWTaskStatus {
SRWLatch lock;
int32_t code;
int8_t status;
int8_t ready;
bool cancel;
bool drop;
} SQWorkerTaskStatus;
} SQWTaskStatus;
typedef struct SQWorkerResCache {
SRWLatch lock;
void *data;
} SQWorkerResCache;
typedef struct SQWorkerSchStatus {
typedef struct SQWSchStatus {
int32_t lastAccessTs; // timestamp in second
SRWLatch tasksLock;
SHashObj *tasksHash; // key:queryId+taskId, value: SQWorkerTaskStatus
} SQWorkerSchStatus;
} SQWSchStatus;
// Qnode/Vnode level task management
typedef struct SQWorkerMgmt {
......@@ -71,7 +87,7 @@ typedef struct SQWorkerMgmt {
SHashObj *resHash; //key: queryId+taskId, value: SQWorkerResCache
} SQWorkerMgmt;
#define QW_GOT_RES_DATA(data) (false)
#define QW_GOT_RES_DATA(data) (true)
#define QW_LOW_RES_DATA(data) (false)
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
......@@ -89,6 +105,8 @@ typedef struct SQWorkerMgmt {
#define QW_LOCK(type, _lock) (QW_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock))
#define QW_UNLOCK(type, _lock) (QW_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch, int32_t nOpt);
#ifdef __cplusplus
}
......
......@@ -4,38 +4,42 @@
#include "qworkerInt.h"
#include "planner.h"
int32_t qwCheckStatusSwitch(int8_t oriStatus, int8_t newStatus) {
int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) {
int32_t code = 0;
if (oriStatus == newStatus) {
if (newStatus == JOB_TASK_STATUS_CANCELLING) {
return TSDB_CODE_SUCCESS;
}
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
switch (oriStatus) {
case JOB_TASK_STATUS_NULL:
if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_FAILED ) {
if (newStatus != JOB_TASK_STATUS_EXECUTING
&& newStatus != JOB_TASK_STATUS_FAILED
&& newStatus != JOB_TASK_STATUS_NOT_START) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_NOT_START:
if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_FAILED) {
if (newStatus != JOB_TASK_STATUS_CANCELLED) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_EXECUTING:
if (newStatus != JOB_TASK_STATUS_SUCCEED && newStatus != JOB_TASK_STATUS_FAILED && newStatus != JOB_TASK_STATUS_CANCELLING) {
if (newStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED
&& newStatus != JOB_TASK_STATUS_FAILED
&& newStatus != JOB_TASK_STATUS_CANCELLING
&& newStatus != JOB_TASK_STATUS_CANCELLED
&& newStatus != JOB_TASK_STATUS_DROPPING) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
break;
case JOB_TASK_STATUS_PARTIAL_SUCCEED:
if (newStatus != JOB_TASK_STATUS_EXECUTING && newStatus != JOB_TASK_STATUS_CANCELLING) {
if (newStatus != JOB_TASK_STATUS_EXECUTING
&& newStatus != JOB_TASK_STATUS_SUCCEED
&& newStatus != JOB_TASK_STATUS_CANCELLED) {
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
......@@ -49,6 +53,10 @@ int32_t qwCheckStatusSwitch(int8_t oriStatus, int8_t newStatus) {
break;
case JOB_TASK_STATUS_CANCELLED:
case JOB_TASK_STATUS_DROPPING:
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
break;
default:
qError("invalid task status:%d", oriStatus);
return TSDB_CODE_QRY_APP_ERROR;
......@@ -58,17 +66,17 @@ int32_t qwCheckStatusSwitch(int8_t oriStatus, int8_t newStatus) {
_return:
qError("invalid task status:%d", oriStatus);
qError("invalid task status, from %d to %d", oriStatus, newStatus);
QW_ERR_RET(code);
}
int32_t qwUpdateTaskInfo(SQWorkerTaskStatus *task, int8_t type, void *data) {
int32_t qwUpdateTaskInfo(SQWTaskStatus *task, int8_t type, void *data) {
int32_t code = 0;
switch (type) {
case QW_TASK_INFO_STATUS: {
int8_t newStatus = *(int8_t *)data;
QW_ERR_RET(qwCheckStatusSwitch(task->status, newStatus));
QW_ERR_RET(qwValidateStatus(task->status, newStatus));
task->status = newStatus;
break;
}
......@@ -80,9 +88,9 @@ int32_t qwUpdateTaskInfo(SQWorkerTaskStatus *task, int8_t type, void *data) {
return TSDB_CODE_SUCCESS;
}
int32_t qwAddTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, void *data) {
char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId);
int32_t qwAddTaskResCache(SQWorkerMgmt *mgmt, uint64_t qId, uint64_t tId, void *data) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
SQWorkerResCache resCache = {0};
resCache.data = data;
......@@ -90,7 +98,7 @@ int32_t qwAddTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, v
QW_LOCK(QW_WRITE, &mgmt->resLock);
if (0 != taosHashPut(mgmt->resHash, id, sizeof(id), &resCache, sizeof(SQWorkerResCache))) {
QW_UNLOCK(QW_WRITE, &mgmt->resLock);
qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to resHash failed", queryId, taskId);
qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to resHash failed", qId, tId);
return TSDB_CODE_QRY_APP_ERROR;
}
......@@ -99,37 +107,8 @@ int32_t qwAddTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, v
return TSDB_CODE_SUCCESS;
}
int32_t qwGetTaskResult(SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, void **data) {
char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId);
SQWorkerResCache *resCache = taosHashGet(mgmt->resHash, id, sizeof(id));
if (NULL == resCache) {
qError("no task res for queryId[%"PRIx64"] taskId[%"PRIx64"]", queryId, taskId);
return TSDB_CODE_QRY_APP_ERROR;
}
*data = resCache->data;
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t schedulerId, SQWorkerSchStatus **sch) {
QW_LOCK(rwType, &mgmt->schLock);
*sch = taosHashGet(mgmt->schHash, &schedulerId, sizeof(schedulerId));
if (NULL == (*sch)) {
QW_LOCK(rwType, &mgmt->schLock);
return TSDB_CODE_QRY_SCH_NOT_EXIST;
}
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t qwInsertAndAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t schedulerId, SQWorkerSchStatus **sch) {
SQWorkerSchStatus newSch = {0};
static int32_t qwAddScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch) {
SQWSchStatus newSch = {0};
newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == newSch.tasksHash) {
qError("taosHashInit %d failed", mgmt->cfg.maxSchTaskNum);
......@@ -138,19 +117,18 @@ static FORCE_INLINE int32_t qwInsertAndAcquireScheduler(int32_t rwType, SQWorker
while (true) {
QW_LOCK(QW_WRITE, &mgmt->schLock);
int32_t code = taosHashPut(mgmt->schHash, &schedulerId, sizeof(schedulerId), &newSch, sizeof(newSch));
int32_t code = taosHashPut(mgmt->schHash, &sId, sizeof(sId), &newSch, sizeof(newSch));
if (0 != code) {
if (!HASH_NODE_EXIST(code)) {
QW_UNLOCK(QW_WRITE, &mgmt->schLock);
qError("taosHashPut schedulerId[%"PRIx64"] to scheduleHash failed", schedulerId);
qError("taosHashPut sId[%"PRIx64"] to scheduleHash failed", sId);
taosHashCleanup(newSch.tasksHash);
return TSDB_CODE_QRY_APP_ERROR;
}
}
QW_UNLOCK(QW_WRITE, &mgmt->schLock);
if (TSDB_CODE_SUCCESS == qwAcquireScheduler(rwType, mgmt, schedulerId, sch)) {
taosHashCleanup(newSch.tasksHash);
if (TSDB_CODE_SUCCESS == qwAcquireScheduler(rwType, mgmt, sId, sch, QW_NOT_EXIST_ADD)) {
return TSDB_CODE_SUCCESS;
}
}
......@@ -159,63 +137,122 @@ static FORCE_INLINE int32_t qwInsertAndAcquireScheduler(int32_t rwType, SQWorker
}
static int32_t qwAcquireScheduler(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t sId, SQWSchStatus **sch, int32_t nOpt) {
QW_LOCK(rwType, &mgmt->schLock);
*sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId));
if (NULL == (*sch)) {
QW_UNLOCK(rwType, &mgmt->schLock);
if (QW_NOT_EXIST_ADD == nOpt) {
return qwAddScheduler(rwType, mgmt, sId, sch);
} else if (QW_NOT_EXIST_RET_ERR == nOpt) {
return TSDB_CODE_QRY_SCH_NOT_EXIST;
} else {
assert(0);
}
}
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) {
QW_UNLOCK(rwType, &mgmt->schLock);
}
static FORCE_INLINE int32_t qwAcquireTask(int32_t rwType, SQWorkerSchStatus *sch, uint64_t queryId, uint64_t taskId, SQWorkerTaskStatus **task) {
char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId);
static int32_t qwAcquireTaskImpl(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, SQWTaskStatus **task) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
QW_LOCK(rwType, &sch->tasksLock);
*task = taosHashGet(sch->tasksHash, id, sizeof(id));
if (NULL == (*task)) {
QW_UNLOCK(rwType, &sch->tasksLock);
return TSDB_CODE_QRY_TASK_NOT_EXIST;
}
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t qwInsertAndAcquireTask(int32_t rwType, SQWorkerSchStatus *sch, uint64_t queryId, uint64_t taskId, int8_t status, bool *inserted, SQWorkerTaskStatus **task) {
char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId);
static int32_t qwAcquireTask(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, SQWTaskStatus **task) {
return qwAcquireTaskImpl(rwType, sch, qId, tId, task);
}
static FORCE_INLINE void qwReleaseTask(int32_t rwType, SQWSchStatus *sch) {
QW_UNLOCK(rwType, &sch->tasksLock);
}
int32_t qwAddTaskToSch(int32_t rwType, SQWSchStatus *sch, uint64_t qId, uint64_t tId, int8_t status, int32_t eOpt, SQWTaskStatus **task) {
int32_t code = 0;
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
SQWTaskStatus ntask = {0};
ntask.status = status;
while (true) {
*inserted = false;
QW_LOCK(QW_WRITE, &sch->tasksLock);
int32_t code = taosHashPut(sch->tasksHash, id, sizeof(id), &status, sizeof(status));
int32_t code = taosHashPut(sch->tasksHash, id, sizeof(id), &ntask, sizeof(ntask));
if (0 != code) {
QW_UNLOCK(QW_WRITE, &sch->tasksLock);
if (HASH_NODE_EXIST(code)) {
if (qwAcquireTask(rwType, sch, queryId, taskId, task)) {
continue;
if (QW_EXIST_ACQUIRE == eOpt && rwType && task) {
if (qwAcquireTask(rwType, sch, qId, tId, task)) {
continue;
}
} else if (QW_EXIST_RET_ERR == eOpt) {
return TSDB_CODE_QRY_TASK_ALREADY_EXIST;
} else {
assert(0);
}
break;
} else {
qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", queryId, taskId);
qError("taosHashPut queryId[%"PRIx64"] taskId[%"PRIx64"] to scheduleHash failed", qId, tId);
return TSDB_CODE_QRY_APP_ERROR;
}
}
QW_UNLOCK(QW_WRITE, &sch->tasksLock);
*inserted = true;
if (TSDB_CODE_SUCCESS == qwAcquireTask(rwType, sch, queryId, taskId, task)) {
return TSDB_CODE_SUCCESS;
if (rwType && task) {
if (TSDB_CODE_SUCCESS == qwAcquireTask(rwType, sch, qId, tId, task)) {
return TSDB_CODE_SUCCESS;
}
} else {
break;
}
}
}
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE void qwReleaseTask(int32_t rwType, SQWorkerSchStatus *sch) {
QW_UNLOCK(rwType, &sch->tasksLock);
static int32_t qwAddTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t status, int32_t eOpt, SQWSchStatus **sch, SQWTaskStatus **task) {
SQWSchStatus *tsch = NULL;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &tsch, QW_NOT_EXIST_ADD));
int32_t code = qwAddTaskToSch(QW_READ, tsch, qId, tId, status, eOpt, task);
if (code) {
qwReleaseScheduler(QW_WRITE, mgmt);
}
if (NULL == task) {
qwReleaseScheduler(QW_READ, mgmt);
} else if (sch) {
*sch = tsch;
}
QW_RET(code);
}
static FORCE_INLINE int32_t qwAcquireTaskResCache(int32_t rwType, SQWorkerMgmt *mgmt, uint64_t queryId, uint64_t taskId, SQWorkerResCache **res) {
char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId);
......@@ -235,27 +272,24 @@ static FORCE_INLINE void qwReleaseTaskResCache(int32_t rwType, SQWorkerMgmt *mgm
}
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, SSchedulerStatusRsp **rsp) {
SQWorkerSchStatus *schStatus = NULL;
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp) {
SQWSchStatus *sch = NULL;
int32_t taskNum = 0;
if (qwAcquireScheduler(QW_READ, mgmt, schedulerId, &schStatus)) {
qWarn("no scheduler for schedulerId[%"PRIx64"]", schedulerId);
} else {
schStatus->lastAccessTs = taosGetTimestampSec();
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR));
sch->lastAccessTs = taosGetTimestampSec();
QW_LOCK(QW_READ, &schStatus->tasksLock);
taskNum = taosHashGetSize(schStatus->tasksHash);
}
QW_LOCK(QW_READ, &sch->tasksLock);
taskNum = taosHashGetSize(sch->tasksHash);
int32_t size = sizeof(SSchedulerStatusRsp) + sizeof((*rsp)->status[0]) * taskNum;
*rsp = calloc(1, size);
if (NULL == *rsp) {
qError("calloc %d failed", size);
if (schStatus) {
QW_UNLOCK(QW_READ, &schStatus->tasksLock);
qwReleaseScheduler(QW_READ, mgmt);
}
QW_UNLOCK(QW_READ, &sch->tasksLock);
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
......@@ -264,23 +298,19 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, SScheduler
size_t keyLen = 0;
int32_t i = 0;
if (schStatus) {
void *pIter = taosHashIterate(schStatus->tasksHash, NULL);
while (pIter) {
SQWorkerTaskStatus *taskStatus = (SQWorkerTaskStatus *)pIter;
taosHashGetKey(pIter, &key, &keyLen);
void *pIter = taosHashIterate(sch->tasksHash, NULL);
while (pIter) {
SQWTaskStatus *taskStatus = (SQWTaskStatus *)pIter;
taosHashGetKey(pIter, &key, &keyLen);
QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId);
(*rsp)->status[i].status = taskStatus->status;
pIter = taosHashIterate(schStatus->tasksHash, pIter);
}
}
QW_GET_QTID(key, (*rsp)->status[i].queryId, (*rsp)->status[i].taskId);
(*rsp)->status[i].status = taskStatus->status;
pIter = taosHashIterate(sch->tasksHash, pIter);
}
if (schStatus) {
QW_UNLOCK(QW_READ, &schStatus->tasksLock);
qwReleaseScheduler(QW_READ, mgmt);
}
QW_UNLOCK(QW_READ, &sch->tasksLock);
qwReleaseScheduler(QW_READ, mgmt);
(*rsp)->num = taskNum;
......@@ -289,115 +319,81 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, SScheduler
int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t schedulerId) {
SQWorkerSchStatus *schStatus = NULL;
int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t sId) {
SQWSchStatus *sch = NULL;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &schStatus));
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR));
schStatus->lastAccessTs = taosGetTimestampSec();
sch->lastAccessTs = taosGetTimestampSec();
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS;
}
int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, int8_t *taskStatus) {
SQWorkerSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL;
int32_t qwUpdateTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t status) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task));
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR));
*taskStatus = task->status;
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, qId, tId, &task));
QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status);
QW_UNLOCK(QW_WRITE, &task->lock);
_return:
if (task) {
qwReleaseTask(QW_READ, sch);
}
if (sch) {
qwReleaseScheduler(QW_READ, mgmt);
}
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
QW_RET(code);
}
int32_t qwSwitchTaskStatus(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, int8_t taskStatus) {
SQWorkerSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL;
int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, int8_t *taskStatus) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
bool inserted = false;
if (qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)) {
if (qwCheckStatusSwitch(JOB_TASK_STATUS_NULL, taskStatus)) {
qError("switch status error, not start to %d", taskStatus);
QW_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
QW_ERR_RET(qwInsertAndAcquireScheduler(QW_READ, mgmt, schedulerId, &sch));
if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)) {
*taskStatus = JOB_TASK_STATUS_NULL;
return TSDB_CODE_SUCCESS;
}
if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) {
if (qwCheckStatusSwitch(JOB_TASK_STATUS_NOT_START, taskStatus)) {
qwReleaseScheduler(QW_READ, mgmt);
qError("switch status error, not start to %d", taskStatus);
QW_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
QW_ERR_JRET(qwInsertAndAcquireTask(QW_READ, sch, queryId, taskId, taskStatus, &inserted, &task));
if (inserted) {
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS;
}
QW_LOCK(QW_WRITE, &task->lock);
code = qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &taskStatus);
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
qwReleaseScheduler(QW_READ, mgmt);
QW_RET(code);
*taskStatus = JOB_TASK_STATUS_NULL;
return TSDB_CODE_SUCCESS;
}
QW_LOCK(QW_WRITE, &task->lock);
code = qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &taskStatus);
QW_UNLOCK(QW_WRITE, &task->lock);
_return:
*taskStatus = task->status;
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
qwReleaseScheduler(QW_READ, mgmt);
QW_RET(code);
}
int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId) {
SQWorkerSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL;
int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
if (TSDB_CODE_SUCCESS != qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)) {
QW_ERR_RET(qwSwitchTaskStatus(mgmt, schedulerId, queryId, taskId, JOB_TASK_STATUS_NOT_START));
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch));
}
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_ADD));
if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) {
code = qwSwitchTaskStatus(mgmt, schedulerId, queryId, taskId, JOB_TASK_STATUS_NOT_START);
qwReleaseScheduler(QW_READ, mgmt);
code = qwAddTask(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_NOT_START, QW_EXIST_ACQUIRE, &sch, &task);
if (code) {
qwReleaseScheduler(QW_READ, mgmt);
QW_ERR_RET(code);
}
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task));
}
QW_LOCK(QW_WRITE, &task->lock);
......@@ -423,6 +419,7 @@ int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId,
}
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
......@@ -449,9 +446,9 @@ _return:
int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId) {
SQWorkerSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL;
int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
char id[sizeof(queryId) + sizeof(taskId)] = {0};
QW_SET_QTID(id, queryId, taskId);
......@@ -462,15 +459,15 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, u
}
QW_UNLOCK(QW_WRITE, &mgmt->resLock);
if (TSDB_CODE_SUCCESS != qwAcquireScheduler(QW_WRITE, mgmt, schedulerId, &sch)) {
qWarn("scheduler %"PRIx64" doesn't exist", schedulerId);
if (TSDB_CODE_SUCCESS != qwAcquireScheduler(QW_WRITE, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)) {
qWarn("scheduler %"PRIx64" doesn't exist", sId);
return TSDB_CODE_SUCCESS;
}
if (qwAcquireTask(QW_WRITE, sch, queryId, taskId, &task)) {
qwReleaseScheduler(QW_WRITE, mgmt);
qWarn("scheduler %"PRIx64" queryId %"PRIx64" taskId:%"PRIx64" doesn't exist", schedulerId, queryId, taskId);
qWarn("scheduler %"PRIx64" queryId %"PRIx64" taskId:%"PRIx64" doesn't exist", sId, queryId, taskId);
return TSDB_CODE_SUCCESS;
}
......@@ -483,21 +480,21 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, u
}
int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId) {
SQWorkerSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL;
int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
if (TSDB_CODE_SUCCESS != qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)) {
qWarn("scheduler %"PRIx64" doesn't exist", schedulerId);
return TSDB_CODE_SUCCESS;
}
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_ADD));
if (qwAcquireTask(QW_READ, sch, queryId, taskId, &task)) {
qwReleaseScheduler(QW_READ, mgmt);
qWarn("scheduler %"PRIx64" queryId %"PRIx64" taskId:%"PRIx64" doesn't exist", schedulerId, queryId, taskId);
return TSDB_CODE_SUCCESS;
code = qwAddTask(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_NOT_START, QW_EXIST_ACQUIRE, &sch, &task);
if (code) {
qwReleaseScheduler(QW_READ, mgmt);
QW_ERR_RET(code);
}
}
QW_LOCK(QW_WRITE, &task->lock);
......@@ -508,7 +505,7 @@ int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t quer
int8_t newStatus = 0;
if (task->status == JOB_TASK_STATUS_EXECUTING) {
newStatus = JOB_TASK_STATUS_CANCELLING;
newStatus = JOB_TASK_STATUS_DROPPING;
QW_ERR_JRET(qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus));
} else if (task->status == JOB_TASK_STATUS_CANCELLING || task->status == JOB_TASK_STATUS_DROPPING || task->status == JOB_TASK_STATUS_NOT_START) {
QW_UNLOCK(QW_WRITE, &task->lock);
......@@ -521,7 +518,7 @@ int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t quer
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
QW_ERR_RET(qwDropTask(mgmt, schedulerId, queryId, taskId));
QW_ERR_RET(qwDropTask(mgmt, sId, queryId, taskId));
return TSDB_CODE_SUCCESS;
}
......@@ -673,12 +670,12 @@ int32_t qwBuildAndSendDropRsp(SRpcMsg *pMsg, int32_t code) {
int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg, int32_t rspCode) {
SQWorkerSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL;
int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg, int32_t rspCode) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch));
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task));
......@@ -717,12 +714,12 @@ _return:
QW_RET(code);
}
int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) {
SQWorkerSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL;
int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch));
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task));
......@@ -756,15 +753,15 @@ _return:
QW_RET(code);
}
int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, bool *needStop) {
SQWorkerSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL;
int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, bool *needStop) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
int8_t status = JOB_TASK_STATUS_CANCELLED;
*needStop = false;
if (qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)) {
if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR)) {
return TSDB_CODE_SUCCESS;
}
......@@ -776,11 +773,13 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_
QW_LOCK(QW_READ, &task->lock);
if ((!task->cancel) && (!task->drop)) {
qError("no cancel or drop, but task:%"PRIx64" exists", taskId);
QW_UNLOCK(QW_READ, &task->lock);
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS;
QW_RET(TSDB_CODE_QRY_APP_ERROR);
}
QW_UNLOCK(QW_READ, &task->lock);
......@@ -791,30 +790,40 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_
QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status);
QW_UNLOCK(QW_WRITE, &task->lock);
} else if (task->drop) {
}
if (task->drop) {
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
qwDropTask(mgmt, schedulerId, queryId, taskId);
return qwDropTask(mgmt, sId, queryId, taskId);
}
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS;
}
int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) {
SQWorkerSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL;
int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
int32_t needRsp = true;
void *data = NULL;
QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch));
QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_RET_ERR));
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task));
QW_LOCK(QW_READ, &task->lock);
if (task->status != JOB_TASK_STATUS_EXECUTING && task->status != JOB_TASK_STATUS_PARTIAL_SUCCEED && task->status != JOB_TASK_STATUS_SUCCEED) {
if (task->cancel || task->drop) {
qError("task is already cancelled or dropped");
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
if (task->status != JOB_TASK_STATUS_EXECUTING && task->status != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
qError("invalid status %d for fetch", task->status);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
......@@ -854,37 +863,46 @@ _return:
QW_RET(code);
}
int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t schedulerId, uint64_t queryId, uint64_t taskId, int8_t status, int32_t errCode) {
SQWorkerSchStatus *sch = NULL;
SQWorkerTaskStatus *task = NULL;
int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t status, int32_t errCode) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
int8_t newStatus = JOB_TASK_STATUS_CANCELLED;
code = qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch);
code = qwAcquireScheduler(QW_READ, mgmt, sId, &sch, QW_NOT_EXIST_ADD);
if (code) {
qError("schedulerId:%"PRIx64" not in cache", schedulerId);
qError("sId:%"PRIx64" not in cache", sId);
QW_ERR_RET(code);
}
code = qwAcquireTask(QW_READ, sch, queryId, taskId, &task);
code = qwAcquireTask(QW_READ, sch, qId, tId, &task);
if (code) {
qwReleaseScheduler(QW_READ, mgmt);
qError("schedulerId:%"PRIx64" queryId:%"PRIx64" taskId:%"PRIx64" not in cache", schedulerId, queryId, taskId);
QW_ERR_RET(code);
if (JOB_TASK_STATUS_PARTIAL_SUCCEED == status || JOB_TASK_STATUS_SUCCEED == status) {
qError("sId:%"PRIx64" queryId:%"PRIx64" taskId:%"PRIx64" not in cache", sId, qId, tId);
QW_ERR_RET(code);
}
QW_ERR_RET(qwAddTask(mgmt, sId, qId, tId, status, QW_EXIST_ACQUIRE, &sch, &task));
}
if (task->cancel) {
QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &newStatus);
QW_UNLOCK(QW_WRITE, &task->lock);
} else if (task->drop) {
}
if (task->drop) {
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
qwDropTask(mgmt, schedulerId, queryId, taskId);
qwDropTask(mgmt, sId, qId, tId);
return TSDB_CODE_SUCCESS;
} else {
}
if (!(task->cancel || task->drop)) {
QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(task, QW_TASK_INFO_STATUS, &status);
task->code = errCode;
......@@ -938,24 +956,24 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
int32_t code = 0;
SSubQueryMsg *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
qError("invalid query msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
msg->schedulerId = htobe64(msg->schedulerId);
msg->sId = htobe64(msg->sId);
msg->queryId = htobe64(msg->queryId);
msg->taskId = htobe64(msg->taskId);
msg->contentLen = ntohl(msg->contentLen);
bool queryDone = false;
bool queryRsp = false;
bool queryRsped = false;
bool needStop = false;
SSubplan *plan = NULL;
int32_t code = 0;
QW_ERR_JRET(qwCheckTaskCancelDrop(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, &needStop));
QW_ERR_JRET(qwCheckTaskCancelDrop(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, &needStop));
if (needStop) {
qWarn("task need stop");
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
......@@ -963,7 +981,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
code = qStringToSubplan(msg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) {
qError("schId:%"PRIx64",qId:%"PRIx64",taskId:%"PRIx64" string to subplan failed, code:%d", msg->schedulerId, msg->queryId, msg->taskId, code);
qError("schId:%"PRIx64",qId:%"PRIx64",taskId:%"PRIx64" string to subplan failed, code:%d", msg->sId, msg->queryId, msg->taskId, code);
QW_ERR_JRET(code);
}
......@@ -974,12 +992,12 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (code) {
QW_ERR_JRET(code);
} else {
QW_ERR_JRET(qwSwitchTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, JOB_TASK_STATUS_EXECUTING));
QW_ERR_JRET(qwAddTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_EXECUTING, QW_EXIST_RET_ERR, NULL, NULL));
}
QW_ERR_JRET(qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_SUCCESS));
queryRsp = true;
queryRsped = true;
//TODO call executer to execute subquery
code = 0;
......@@ -990,29 +1008,29 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (code) {
QW_ERR_JRET(code);
} else {
QW_ERR_JRET(qwAddTaskResult(qWorkerMgmt, msg->queryId, msg->taskId, data));
QW_ERR_JRET(qwAddTaskResCache(qWorkerMgmt, msg->queryId, msg->taskId, data));
QW_ERR_JRET(qwSwitchTaskStatus(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, JOB_TASK_STATUS_PARTIAL_SUCCEED));
}
QW_ERR_JRET(qwUpdateTaskStatus(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_PARTIAL_SUCCEED));
}
_return:
if (queryRsp) {
code = qwCheckAndSendReadyRsp(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, pMsg, code);
if (queryRsped) {
code = qwCheckAndSendReadyRsp(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg, code);
} else {
code = qwBuildAndSendQueryRsp(pMsg, code);
}
int8_t status = 0;
if (TSDB_CODE_SUCCESS != code || queryDone) {
if (code) {
status = JOB_TASK_STATUS_FAILED; //TODO set CANCELLED from code
} else {
status = JOB_TASK_STATUS_SUCCEED;
}
qwQueryPostProcess(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, status, code);
if (TSDB_CODE_SUCCESS != code) {
status = JOB_TASK_STATUS_FAILED;
} else if (queryDone) {
status = JOB_TASK_STATUS_SUCCEED;
} else {
status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
}
qwQueryPostProcess(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, status, code);
QW_RET(code);
}
......@@ -1023,12 +1041,16 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
}
SResReadyMsg *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task status msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
QW_ERR_RET(qwSetAndSendReadyRsp(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, pMsg));
msg->sId = htobe64(msg->sId);
msg->queryId = htobe64(msg->queryId);
msg->taskId = htobe64(msg->taskId);
QW_ERR_RET(qwSetAndSendReadyRsp(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg));
return TSDB_CODE_SUCCESS;
}
......@@ -1040,14 +1062,16 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t code = 0;
SSchTasksStatusMsg *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task status msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
msg->sId = htobe64(msg->sId);
SSchedulerStatusRsp *sStatus = NULL;
QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->schedulerId, &sStatus));
QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus));
_return:
......@@ -1062,11 +1086,15 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
}
SResFetchMsg *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
QW_ERR_RET(qwUpdateSchLastAccess(qWorkerMgmt, msg->schedulerId));
msg->sId = htobe64(msg->sId);
msg->queryId = htobe64(msg->queryId);
msg->taskId = htobe64(msg->taskId);
QW_ERR_RET(qwUpdateSchLastAccess(qWorkerMgmt, msg->sId));
void *data = NULL;
SQWorkerResCache *res = NULL;
......@@ -1074,7 +1102,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_RET(qwAcquireTaskResCache(QW_READ, qWorkerMgmt, msg->queryId, msg->taskId, &res));
QW_ERR_JRET(qwHandleFetch(res, qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, pMsg));
QW_ERR_JRET(qwHandleFetch(res, qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg));
_return:
......@@ -1090,12 +1118,16 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t code = 0;
STaskCancelMsg *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task cancel msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId));
msg->sId = htobe64(msg->sId);
msg->queryId = htobe64(msg->queryId);
msg->taskId = htobe64(msg->taskId);
QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
_return:
......@@ -1111,12 +1143,16 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t code = 0;
STaskDropMsg *msg = pMsg->pCont;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
qError("invalid task drop msg");
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
QW_ERR_JRET(qwCancelDropTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId));
msg->sId = htobe64(msg->sId);
msg->queryId = htobe64(msg->queryId);
msg->taskId = htobe64(msg->taskId);
QW_ERR_JRET(qwCancelDropTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
_return:
......@@ -1125,6 +1161,31 @@ _return:
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerContinueQuery(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t code = 0;
int8_t status = 0;
bool queryDone = false;
uint64_t sId, qId, tId;
//TODO call executer to continue execute subquery
code = 0;
void *data = NULL;
queryDone = false;
//TODO call executer to continue execute subquery
if (TSDB_CODE_SUCCESS != code) {
status = JOB_TASK_STATUS_FAILED;
} else if (queryDone) {
status = JOB_TASK_STATUS_SUCCEED;
} else {
status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
}
code = qwQueryPostProcess(qWorkerMgmt, sId, qId, tId, status, code);
QW_RET(code);
}
void qWorkerDestroy(void **qWorkerMgmt) {
if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
......
......@@ -40,6 +40,11 @@ int32_t qwtStringToPlan(const char* str, SSubplan** subplan) {
return 0;
}
void qwtRpcSendResponse(const SRpcMsg *pRsp) {
return;
}
void stubSetStringToPlan() {
static Stub stub;
......@@ -54,6 +59,20 @@ void stubSetStringToPlan() {
}
}
void stubSetRpcSendResponse() {
static Stub stub;
stub.set(rpcSendResponse, qwtRpcSendResponse);
{
AddrAny any("libplanner.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^rpcSendResponse$", result);
for (const auto& f : result) {
stub.set(f.second, qwtRpcSendResponse);
}
}
}
}
......@@ -68,30 +87,35 @@ TEST(testCase, normalCase) {
SRpcMsg dropRpc = {0};
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
queryMsg->queryId = htobe64(1);
queryMsg->schedulerId = htobe64(1);
queryMsg->sId = htobe64(1);
queryMsg->taskId = htobe64(1);
queryMsg->contentLen = htonl(100);
queryRpc.pCont = queryMsg;
queryRpc.contLen = sizeof(SSubQueryMsg) + 100;
SResReadyMsg readyMsg = {0};
readyMsg.schedulerId = htobe64(1);
readyMsg.sId = htobe64(1);
readyMsg.queryId = htobe64(1);
readyMsg.taskId = htobe64(1);
readyRpc.pCont = &readyMsg;
readyRpc.contLen = sizeof(SResReadyMsg);
SResFetchMsg fetchMsg = {0};
fetchMsg.schedulerId = htobe64(1);
fetchMsg.sId = htobe64(1);
fetchMsg.queryId = htobe64(1);
fetchMsg.taskId = htobe64(1);
fetchRpc.pCont = &fetchMsg;
fetchRpc.contLen = sizeof(SResFetchMsg);
STaskDropMsg dropMsg = {0};
dropMsg.schedulerId = htobe64(1);
dropMsg.sId = htobe64(1);
dropMsg.queryId = htobe64(1);
dropMsg.taskId = htobe64(1);
dropRpc.pCont = &dropMsg;
dropRpc.contLen = sizeof(STaskDropMsg);
stubSetStringToPlan();
stubSetRpcSendResponse();
code = qWorkerInit(NULL, &mgmt);
ASSERT_EQ(code, 0);
......@@ -107,7 +131,8 @@ TEST(testCase, normalCase) {
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
ASSERT_EQ(code, 0);
qWorkerDestroy(&mgmt);
}
......
......@@ -38,7 +38,7 @@ enum {
typedef struct SSchedulerMgmt {
uint64_t taskId;
uint64_t schedulerId;
uint64_t sId;
SSchedulerCfg cfg;
SHashObj *jobs; // key: queryId, value: SQueryJob*
} SSchedulerMgmt;
......
......@@ -343,7 +343,7 @@ int32_t schAsyncSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
SSubQueryMsg *pMsg = msg;
pMsg->schedulerId = htobe64(schMgmt.schedulerId);
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
pMsg->contentLen = htonl(task->msgLen);
......@@ -359,7 +359,7 @@ int32_t schAsyncSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
}
SResReadyMsg *pMsg = msg;
pMsg->schedulerId = htobe64(schMgmt.schedulerId);
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
break;
......@@ -376,7 +376,7 @@ int32_t schAsyncSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
}
SResFetchMsg *pMsg = msg;
pMsg->schedulerId = htobe64(schMgmt.schedulerId);
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
break;
......@@ -390,7 +390,7 @@ int32_t schAsyncSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) {
}
STaskDropMsg *pMsg = msg;
pMsg->schedulerId = htobe64(schMgmt.schedulerId);
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(job->queryId);
pMsg->taskId = htobe64(task->taskId);
break;
......@@ -717,7 +717,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum);
}
schMgmt.schedulerId = 1; //TODO GENERATE A UUID
schMgmt.sId = 1; //TODO GENERATE A UUID
return TSDB_CODE_SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册