提交 b1980e8f 编写于 作者: D dapan1121

feature/qnode

上级 e2e08176
...@@ -70,6 +70,7 @@ typedef struct SQWTaskCtx { ...@@ -70,6 +70,7 @@ typedef struct SQWTaskCtx {
SRWLatch lock; SRWLatch lock;
int8_t sinkScheduled; int8_t sinkScheduled;
int8_t queryScheduled; int8_t queryScheduled;
bool needRsp; bool needRsp;
qTaskInfo_t taskHandle; qTaskInfo_t taskHandle;
DataSinkHandle sinkHandle; DataSinkHandle sinkHandle;
...@@ -99,7 +100,7 @@ typedef struct SQWorkerMgmt { ...@@ -99,7 +100,7 @@ typedef struct SQWorkerMgmt {
#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code)) #define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code))
#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code)) #define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code))
#define QW_TASK_READY_RESP(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || status == JOB_TASK_STATUS_PARTIAL_SUCCEED) #define QW_TASK_READY(status) (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || status == JOB_TASK_STATUS_PARTIAL_SUCCEED)
#define QW_SET_QTID(id, qId, tId) do { *(uint64_t *)(id) = (qId); *(uint64_t *)((char *)(id) + sizeof(qId)) = (tId); } while (0) #define QW_SET_QTID(id, qId, tId) do { *(uint64_t *)(id) = (qId); *(uint64_t *)((char *)(id) + sizeof(qId)) = (tId); } while (0)
#define QW_GET_QTID(id, qId, tId) do { (qId) = *(uint64_t *)(id); (tId) = *(uint64_t *)((char *)(id) + sizeof(qId)); } while (0) #define QW_GET_QTID(id, qId, tId) do { (qId) = *(uint64_t *)(id); (tId) = *(uint64_t *)((char *)(id) + sizeof(qId)); } while (0)
#define QW_IDS() sId, qId, tId #define QW_IDS() sId, qId, tId
...@@ -111,16 +112,16 @@ typedef struct SQWorkerMgmt { ...@@ -111,16 +112,16 @@ typedef struct SQWorkerMgmt {
#define QW_ELOG(param, ...) qError("QW:%p " param, mgmt, __VA_ARGS__) #define QW_ELOG(param, ...) qError("QW:%p " param, mgmt, __VA_ARGS__)
#define QW_DLOG(param, ...) qDebug("QW:%p " param, mgmt, __VA_ARGS__) #define QW_DLOG(param, ...) qDebug("QW:%p " param, mgmt, __VA_ARGS__)
#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64 param, mgmt, sId, __VA_ARGS__) #define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64 param, mgmt, sId, __VA_ARGS__) #define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64" " param, mgmt, sId, __VA_ARGS__)
#define QW_TASK_ELOG(param, ...) qError("QW:%p QID:%"PRIx64",TID:%"PRIx64 param, mgmt, qId, tId, __VA_ARGS__) #define QW_TASK_ELOG(param, ...) qError("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:%"PRIx64",TID:%"PRIx64 param, mgmt, qId, tId, __VA_ARGS__) #define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:%"PRIx64",TID:%"PRIx64 param, mgmt, qId, tId, __VA_ARGS__) #define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64 param, mgmt, sId, qId, tId, __VA_ARGS__) #define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64 param, mgmt, sId, qId, tId, __VA_ARGS__) #define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64 param, mgmt, sId, qId, tId, __VA_ARGS__) #define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 #define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
......
...@@ -221,7 +221,7 @@ int32_t qwAddTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, ...@@ -221,7 +221,7 @@ int32_t qwAddTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId,
int32_t code = 0; int32_t code = 0;
QW_ERR_RET(qwAcquireAddScheduler(QW_READ, mgmt, sId, &tsch)); QW_ERR_RET(qwAcquireAddScheduler(QW_READ, mgmt, sId, &tsch));
QW_ERR_JRET(qwAddTaskImpl(mgmt, tsch, 0, qId, tId, JOB_TASK_STATUS_NOT_START, QW_EXIST_RET_ERR, NULL)); QW_ERR_JRET(qwAddTaskImpl(mgmt, tsch, 0, qId, tId, status, QW_EXIST_RET_ERR, NULL));
_return: _return:
...@@ -557,6 +557,7 @@ int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_ ...@@ -557,6 +557,7 @@ int32_t qwCancelDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
} }
QW_UNLOCK(QW_WRITE, &task->lock); QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
...@@ -778,30 +779,35 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe ...@@ -778,30 +779,35 @@ int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchRe
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg, int32_t rspCode) { int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SRpcMsg *pMsg) {
SQWSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, queryId, taskId, &task)); QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task));
QW_LOCK(QW_WRITE, &task->lock); QW_LOCK(QW_WRITE, &task->lock);
if (QW_READY_NOT_RECEIVED == task->ready) { if (QW_READY_NOT_RECEIVED == task->ready) {
QW_UNLOCK(QW_WRITE, &task->lock); QW_SCH_TASK_DLOG("ready not received, ready:%d", task->ready);
goto _return;
} else if (QW_READY_RECEIVED == task->ready) {
task->ready = QW_READY_RESPONSED;
int32_t rspCode = task->code;
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS; QW_ERR_RET(qwBuildAndSendReadyRsp(pMsg, rspCode));
} else if (QW_READY_RECEIVED == task->ready) {
QW_ERR_JRET(qwBuildAndSendReadyRsp(pMsg, rspCode));
task->ready = QW_READY_RESPONSED; QW_SCH_TASK_DLOG("ready response sent, ready:%d", task->ready);
return TSDB_CODE_SUCCESS;
} else if (QW_READY_RESPONSED == task->ready) { } else if (QW_READY_RESPONSED == task->ready) {
qError("query response already send"); QW_SCH_TASK_ELOG("ready response already send, ready:%d", task->ready);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} else { } else {
assert(0); assert(0);
...@@ -812,7 +818,6 @@ _return: ...@@ -812,7 +818,6 @@ _return:
if (task) { if (task) {
QW_UNLOCK(QW_WRITE, &task->lock); QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
} }
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
...@@ -820,34 +825,39 @@ _return: ...@@ -820,34 +825,39 @@ _return:
QW_RET(code); QW_RET(code);
} }
int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SRpcMsg *pMsg) {
SQWSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, queryId, taskId, &task)); QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task));
QW_LOCK(QW_WRITE, &task->lock); QW_LOCK(QW_WRITE, &task->lock);
if (QW_TASK_READY_RESP(task->status)) {
QW_ERR_JRET(qwBuildAndSendReadyRsp(pMsg, task->code));
int8_t status = task->status;
int32_t errCode = task->code;
if (QW_TASK_READY(status)) {
task->ready = QW_READY_RESPONSED; task->ready = QW_READY_RESPONSED;
QW_UNLOCK(QW_WRITE, &task->lock);
QW_ERR_JRET(qwBuildAndSendReadyRsp(pMsg, errCode));
QW_SCH_TASK_DLOG("task ready responsed, status:%d", status);
} else { } else {
task->ready = QW_READY_RECEIVED; task->ready = QW_READY_RECEIVED;
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch); QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseScheduler(QW_READ, mgmt);
return TSDB_CODE_SUCCESS; QW_SCH_TASK_DLOG("task ready NOT responsed, status:%d", status);
} }
_return: _return:
if (task) { if (task) {
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
} }
...@@ -873,21 +883,14 @@ int32_t qwCheckAndProcessTaskDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId ...@@ -873,21 +883,14 @@ int32_t qwCheckAndProcessTaskDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
QW_LOCK(QW_READ, &task->lock); if ((!atomic_load_8(&task->cancel)) && (!atomic_load_8(&task->drop))) {
QW_TASK_ELOG("no cancel or drop but task exists, status:%d", atomic_load_8(&task->status));
if ((!task->cancel) && (!task->drop)) {
QW_TASK_ELOG("no cancel or drop but task exists, status:%d", task->status);
QW_UNLOCK(QW_READ, &task->lock);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
QW_UNLOCK(QW_READ, &task->lock);
*needStop = true; *needStop = true;
if (task->cancel) { if (atomic_load_8(&task->cancel)) {
QW_LOCK(QW_WRITE, &task->lock); QW_LOCK(QW_WRITE, &task->lock);
code = qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS()); code = qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS());
QW_UNLOCK(QW_WRITE, &task->lock); QW_UNLOCK(QW_WRITE, &task->lock);
...@@ -929,13 +932,15 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6 ...@@ -929,13 +932,15 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6
QW_ERR_RET(code); QW_ERR_RET(code);
} }
if (task->cancel) {
QW_LOCK(QW_WRITE, &task->lock); QW_LOCK(QW_WRITE, &task->lock);
if (task->cancel) {
qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &newStatus, QW_IDS()); qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &newStatus, QW_IDS());
QW_UNLOCK(QW_WRITE, &task->lock);
} }
if (task->drop) { if (task->drop) {
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
...@@ -944,13 +949,13 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6 ...@@ -944,13 +949,13 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if ((!(task->cancel || task->drop)) && status > 0) { if (!(task->cancel || task->drop)) {
QW_LOCK(QW_WRITE, &task->lock);
qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS()); qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS());
task->code = errCode; task->code = errCode;
QW_UNLOCK(QW_WRITE, &task->lock);
} }
QW_UNLOCK(QW_WRITE, &task->lock);
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt); qwReleaseScheduler(QW_READ, mgmt);
...@@ -995,24 +1000,24 @@ int32_t qwScheduleDataSink(SQWTaskCtx *handles, SQWorkerMgmt *mgmt, uint64_t sId ...@@ -995,24 +1000,24 @@ int32_t qwScheduleDataSink(SQWTaskCtx *handles, SQWorkerMgmt *mgmt, uint64_t sId
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwScheduleQuery(SQWTaskCtx *handles, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { int32_t qwScheduleQuery(SQWTaskCtx *handles, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SRpcMsg *pMsg) {
if (atomic_load_8(&handles->queryScheduled)) { if (atomic_load_8(&handles->queryScheduled)) {
qDebug("query already scheduled"); QW_SCH_TASK_ELOG("query already scheduled, queryScheduled:%d", handles->queryScheduled);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
QW_ERR_RET(qwUpdateTaskStatus(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_EXECUTING)); QW_ERR_RET(qwUpdateTaskStatus(mgmt, sId, qId, tId, JOB_TASK_STATUS_EXECUTING));
SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq)); SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
if (NULL == req) { if (NULL == req) {
qError("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq)); QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(SQueryContinueReq));
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
req->header.vgId = mgmt->nodeId; req->header.vgId = mgmt->nodeId;
req->sId = sId; req->sId = sId;
req->queryId = queryId; req->queryId = qId;
req->taskId = taskId; req->taskId = tId;
SRpcMsg pNewMsg = { SRpcMsg pNewMsg = {
.handle = pMsg->handle, .handle = pMsg->handle,
...@@ -1025,20 +1030,21 @@ int32_t qwScheduleQuery(SQWTaskCtx *handles, SQWorkerMgmt *mgmt, uint64_t sId, u ...@@ -1025,20 +1030,21 @@ int32_t qwScheduleQuery(SQWTaskCtx *handles, SQWorkerMgmt *mgmt, uint64_t sId, u
int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg); int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
qError("put query continue msg to queue failed, code:%x", code); QW_SCH_TASK_ELOG("put query continue msg to queue failed, code:%x", code);
rpcFreeCont(req); rpcFreeCont(req);
QW_ERR_RET(code); QW_ERR_RET(code);
} }
handles->queryScheduled = true;
qDebug("put query continue msg to query queue"); QW_SCH_TASK_DLOG("put query continue msg to query queue, vgId:%d", mgmt->nodeId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) { int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SRpcMsg *pMsg) {
SQWSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL; SQWTaskStatus *task = NULL;
int32_t code = 0; int32_t code = 0;
...@@ -1049,25 +1055,29 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64 ...@@ -1049,25 +1055,29 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64
SRetrieveTableRsp *rsp = NULL; SRetrieveTableRsp *rsp = NULL;
bool queryEnd = false; bool queryEnd = false;
SQWTaskCtx *handles = NULL; SQWTaskCtx *handles = NULL;
int8_t status = 0;
QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, mgmt, qId, tId, &handles));
QW_LOCK(QW_WRITE, &handles->lock);
QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, mgmt, queryId, taskId, &handles)); if (handles->needRsp) {
if (atomic_load_8(&handles->needRsp)) { QW_UNLOCK(QW_WRITE, &handles->lock);
qError("last fetch not responsed"); QW_SCH_TASK_ELOG("last fetch not responsed, needRsp:%d", handles->needRsp);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch)); QW_UNLOCK(QW_WRITE, &handles->lock);
QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, queryId, taskId, &task));
QW_LOCK(QW_READ, &task->lock); QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task));
if (task->cancel || task->drop) { if (task->cancel || task->drop) {
qError("task is already cancelled or dropped"); QW_SCH_TASK_ELOG("task is already cancelled or dropped, cancel:%d, drop:%d", task->cancel, task->drop);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
if (task->status != JOB_TASK_STATUS_EXECUTING && task->status != JOB_TASK_STATUS_PARTIAL_SUCCEED) { if (task->status != JOB_TASK_STATUS_EXECUTING && task->status != JOB_TASK_STATUS_PARTIAL_SUCCEED) {
qError("invalid status %d for fetch", task->status); QW_SCH_TASK_ELOG("invalid status %d for fetch", task->status);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
...@@ -1075,6 +1085,9 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64 ...@@ -1075,6 +1085,9 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64
if (dataLength > 0) { if (dataLength > 0) {
SOutputData output = {0}; SOutputData output = {0};
QW_SCH_TASK_DLOG("task got data in sink, dataLength:%d", dataLength);
QW_ERR_JRET(qwInitFetchRsp(dataLength, &rsp)); QW_ERR_JRET(qwInitFetchRsp(dataLength, &rsp));
output.pData = rsp->data; output.pData = rsp->data;
...@@ -1095,27 +1108,38 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64 ...@@ -1095,27 +1108,38 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64
if (DS_BUF_EMPTY == output.bufStatus && output.queryEnd) { if (DS_BUF_EMPTY == output.bufStatus && output.queryEnd) {
rsp->completed = 1; rsp->completed = 1;
QW_ERR_JRET(qwUpdateTaskStatus(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_SUCCEED)); status = JOB_TASK_STATUS_SUCCEED;
QW_SCH_TASK_DLOG("task all fetched, status:%d", status);
QW_ERR_JRET(qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS()));
} }
// Note: schedule data sink firstly and will schedule query after it's done // Note: schedule data sink firstly and will schedule query after it's done
if (output.needSchedule) { if (output.needSchedule) {
QW_ERR_JRET(qwScheduleDataSink(handles, mgmt, sId, queryId, taskId, pMsg)); QW_SCH_TASK_DLOG("sink need schedule, queryEnd:%d", output.queryEnd);
QW_ERR_JRET(qwScheduleDataSink(handles, mgmt, sId, qId, tId, pMsg));
} else if ((!output.queryEnd) && (DS_BUF_LOW == output.bufStatus || DS_BUF_EMPTY == output.bufStatus)) { } else if ((!output.queryEnd) && (DS_BUF_LOW == output.bufStatus || DS_BUF_EMPTY == output.bufStatus)) {
QW_ERR_JRET(qwScheduleQuery(handles, mgmt, sId, queryId, taskId, pMsg)); QW_SCH_TASK_DLOG("task not end, need to continue, bufStatus:%d", output.bufStatus);
QW_ERR_JRET(qwScheduleQuery(handles, mgmt, sId, qId, tId, pMsg));
} }
} else { } else {
if (dataLength < 0) { if (dataLength < 0) {
qError("invalid length from dsGetDataLength, length:%d", dataLength); QW_SCH_TASK_ELOG("invalid length from dsGetDataLength, length:%d", dataLength);
QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
} }
if (queryEnd) { if (queryEnd) {
QW_ERR_JRET(qwUpdateTaskStatus(mgmt, sId, queryId, taskId, JOB_TASK_STATUS_SUCCEED)); status = JOB_TASK_STATUS_SUCCEED;
QW_SCH_TASK_DLOG("no data in sink and query end, dataLength:%d", dataLength);
QW_ERR_JRET(qwUpdateTaskInfo(mgmt, task, QW_TASK_INFO_STATUS, &status, QW_IDS()));
} else { } else {
assert(0 == handles->needRsp); assert(0 == handles->needRsp);
qDebug("no res data in sink, need response later"); // MUST IN SCHEDULE OR IN SINK SCHEDULE
QW_SCH_TASK_DLOG("no res data in sink, need response later, queryEnd:%d", queryEnd);
QW_LOCK(QW_WRITE, &handles->lock); QW_LOCK(QW_WRITE, &handles->lock);
handles->needRsp = true; handles->needRsp = true;
...@@ -1128,7 +1152,6 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64 ...@@ -1128,7 +1152,6 @@ int32_t qwHandleFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64
_return: _return:
if (task) { if (task) {
QW_UNLOCK(QW_READ, &task->lock);
qwReleaseTask(QW_READ, sch); qwReleaseTask(QW_READ, sch);
} }
...@@ -1212,10 +1235,10 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -1212,10 +1235,10 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
bool queryRsped = false; bool queryRsped = false;
bool needStop = false; bool needStop = false;
bool taskAdded = false;
struct SSubplan *plan = NULL; struct SSubplan *plan = NULL;
SSubQueryMsg *msg = pMsg->pCont; SSubQueryMsg *msg = pMsg->pCont;
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt; SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
int32_t rspCode = 0;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
QW_ELOG("invalid query msg, contLen:%d", pMsg->contLen); QW_ELOG("invalid query msg, contLen:%d", pMsg->contLen);
...@@ -1238,6 +1261,8 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -1238,6 +1261,8 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED); QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED);
} }
QW_ERR_JRET(qwAddTask(qWorkerMgmt, sId, qId, tId, JOB_TASK_STATUS_EXECUTING));
code = qStringToSubplan(msg->msg, &plan); code = qStringToSubplan(msg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("string to subplan failed, code:%d", code); QW_TASK_ELOG("string to subplan failed, code:%d", code);
...@@ -1248,53 +1273,49 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -1248,53 +1273,49 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
code = qCreateExecTask(node, 0, (struct SSubplan *)plan, &pTaskInfo); code = qCreateExecTask(node, 0, (struct SSubplan *)plan, &pTaskInfo);
if (code) { if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x", code); QW_TASK_ELOG("qCreateExecTask failed, code:%x", code);
QW_ERR_JRET(qwAddTask(qWorkerMgmt, sId, qId, tId, JOB_TASK_STATUS_FAILED));
QW_ERR_JRET(code); QW_ERR_JRET(code);
} }
QW_ERR_JRET(qwAddTask(qWorkerMgmt, sId, qId, tId, JOB_TASK_STATUS_EXECUTING));
taskAdded = true;
QW_ERR_JRET(qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_SUCCESS)); QW_ERR_JRET(qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_SUCCESS));
queryRsped = true; queryRsped = true;
DataSinkHandle sinkHandle = NULL; DataSinkHandle sinkHandle = NULL;
code = qExecTask(pTaskInfo, &sinkHandle); code = qExecTask(pTaskInfo, &sinkHandle);
if (code) { if (code) {
QW_TASK_ELOG("qExecTask failed, code:%x", code); QW_TASK_ELOG("qExecTask failed, code:%x", code);
QW_ERR_JRET(code); QW_ERR_JRET(code);
} }
QW_ERR_JRET(qwAddTaskHandlesToCache(qWorkerMgmt, msg->queryId, msg->taskId, pTaskInfo, sinkHandle)); QW_ERR_JRET(qwAddTaskHandlesToCache(qWorkerMgmt, msg->queryId, msg->taskId, pTaskInfo, sinkHandle));
QW_ERR_JRET(qwUpdateTaskStatus(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, JOB_TASK_STATUS_PARTIAL_SUCCEED));
_return: _return:
if (queryRsped) { if (code) {
code = qwCheckAndSendReadyRsp(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg, code); rspCode = code;
} else { }
code = qwBuildAndSendQueryRsp(pMsg, code);
if (!queryRsped) {
code = qwBuildAndSendQueryRsp(pMsg, rspCode);
if (TSDB_CODE_SUCCESS == rspCode && code) {
rspCode = code;
}
} }
int8_t status = 0; int8_t status = 0;
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != rspCode) {
status = JOB_TASK_STATUS_FAILED; status = JOB_TASK_STATUS_FAILED;
} else { } else {
status = JOB_TASK_STATUS_PARTIAL_SUCCEED; status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
} }
if (!taskAdded) { qwQueryPostProcess(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, status, rspCode);
qwAddTask(qWorkerMgmt, sId, qId, tId, status);
status = -1; if (queryRsped) {
qwCheckAndSendReadyRsp(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg);
} }
qwQueryPostProcess(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, status, code); QW_RET(rspCode);
QW_RET(code);
} }
int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
...@@ -1306,19 +1327,27 @@ int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *p ...@@ -1306,19 +1327,27 @@ int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *p
SQWTaskCtx *handles = NULL; SQWTaskCtx *handles = NULL;
QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, qWorkerMgmt, req->queryId, req->taskId, &handles)); QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, qWorkerMgmt, req->queryId, req->taskId, &handles));
QW_LOCK(QW_WRITE, &handles->lock);
qTaskInfo_t taskHandle = handles->taskHandle; qTaskInfo_t taskHandle = handles->taskHandle;
DataSinkHandle sinkHandle = handles->sinkHandle; DataSinkHandle sinkHandle = handles->sinkHandle;
bool needRsp = handles->needRsp;
QW_UNLOCK(QW_WRITE, &handles->lock);
qwReleaseTaskResCache(QW_READ, qWorkerMgmt); qwReleaseTaskResCache(QW_READ, qWorkerMgmt);
QW_ERR_JRET(qwCheckAndProcessTaskDrop(qWorkerMgmt, req->sId, req->queryId, req->taskId, &needStop)); QW_ERR_JRET(qwCheckAndProcessTaskDrop(qWorkerMgmt, req->sId, req->queryId, req->taskId, &needStop));
if (needStop) { if (needStop) {
qWarn("task need stop"); qWarn("task need stop");
if (needRsp) {
QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, qWorkerMgmt, req->queryId, req->taskId, &handles));
QW_LOCK(QW_WRITE, &handles->lock);
if (handles->needRsp) {
qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_QRY_TASK_CANCELLED); qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_QRY_TASK_CANCELLED);
handles->needRsp = false;
} }
QW_UNLOCK(QW_WRITE, &handles->lock);
qwReleaseTaskResCache(QW_READ, qWorkerMgmt);
QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED); QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED);
} }
...@@ -1336,9 +1365,17 @@ int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *p ...@@ -1336,9 +1365,17 @@ int32_t qWorkerProcessQueryContinueMsg(void *node, void *qWorkerMgmt, SRpcMsg *p
_return: _return:
if (needRsp) { QW_ERR_JRET(qwAcquireTaskCtx(QW_READ, qWorkerMgmt, req->queryId, req->taskId, &handles));
QW_LOCK(QW_WRITE, &handles->lock);
if (handles->needRsp) {
code = qwBuildAndSendQueryRsp(pMsg, code); code = qwBuildAndSendQueryRsp(pMsg, code);
handles->needRsp = false;
} }
handles->queryScheduled = false;
QW_UNLOCK(QW_WRITE, &handles->lock);
qwReleaseTaskResCache(QW_READ, qWorkerMgmt);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
status = JOB_TASK_STATUS_FAILED; status = JOB_TASK_STATUS_FAILED;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册