提交 8b061f52 编写于 作者: D dapan1121

feature/qnode

上级 eed1e2f9
......@@ -81,8 +81,10 @@ typedef struct SQWMsg {
} SQWMsg;
typedef struct SQWPhaseInput {
int8_t status;
int32_t code;
int8_t status;
int32_t code;
qTaskInfo_t taskHandle;
DataSinkHandle sinkHandle;
} SQWPhaseInput;
typedef struct SQWPhaseOutput {
......@@ -102,13 +104,9 @@ typedef struct SQWTaskCtx {
int32_t phase;
int32_t sinkId;
int8_t queryInQ;
int32_t readyCode;
int8_t events[QW_EVENT_MAX];
int8_t ready;
int8_t cancel;
int8_t drop;
int8_t needRsp;
qTaskInfo_t taskHandle;
DataSinkHandle sinkHandle;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_QWORKER_MSG_H_
#define _TD_QWORKER_MSG_H_
#ifdef __cplusplus
extern "C" {
#endif
#include "qworkerInt.h"
#include "dataSinkMgt.h"
int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
int32_t qwBuildAndSendDropRsp(void *connection, int32_t code);
int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code);
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len);
int32_t qwBuildAndSendCQueryMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection);
int32_t qwBuildAndSendSchSinkMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection);
int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code);
int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code);
void qwFreeFetchRsp(void *msg);
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
#ifdef __cplusplus
}
#endif
#endif /*_TD_QWORKER_INT_H_*/
......@@ -4,6 +4,7 @@
#include "planner.h"
#include "query.h"
#include "qworkerInt.h"
#include "qworkerMsg.h"
#include "tmsg.h"
#include "tname.h"
#include "dataSinkMgt.h"
......@@ -130,7 +131,7 @@ int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, u
QW_UNLOCK(rwType, &mgmt->schLock);
if (QW_NOT_EXIST_ADD == nOpt) {
QW_ERR_RET(qwAddSchedulerImpl(rwType, mgmt, sId, sch));
QW_ERR_RET(qwAddSchedulerImpl(QW_FPARAMS(), rwType, sch));
nOpt = QW_NOT_EXIST_RET_ERR;
......@@ -149,17 +150,34 @@ int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, u
}
int32_t qwAcquireAddScheduler(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus **sch) {
return qwAcquireSchedulerImpl(rwType, mgmt, sId, sch, QW_NOT_EXIST_ADD);
return qwAcquireSchedulerImpl(QW_FPARAMS(), rwType, sch, QW_NOT_EXIST_ADD);
}
int32_t qwAcquireScheduler(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus **sch) {
return qwAcquireSchedulerImpl(rwType, mgmt, sId, sch, QW_NOT_EXIST_RET_ERR);
return qwAcquireSchedulerImpl(QW_FPARAMS(), rwType, sch, QW_NOT_EXIST_RET_ERR);
}
void qwReleaseScheduler(int32_t rwType, SQWorkerMgmt *mgmt) {
QW_UNLOCK(rwType, &mgmt->schLock);
}
int32_t qwAcquireTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
QW_LOCK(rwType, &sch->tasksLock);
*task = taosHashGet(sch->tasksHash, id, sizeof(id));
if (NULL == (*task)) {
QW_UNLOCK(rwType, &sch->tasksLock);
QW_ERR_RET(TSDB_CODE_QRY_TASK_NOT_EXIST);
}
return TSDB_CODE_SUCCESS;
}
int32_t qwAddTaskStatusImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWSchStatus *sch, int32_t rwType, int32_t status, SQWTaskStatus **task) {
int32_t code = 0;
......@@ -209,21 +227,6 @@ _return:
}
int32_t qwAcquireTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
QW_LOCK(rwType, &sch->tasksLock);
*task = taosHashGet(sch->tasksHash, id, sizeof(id));
if (NULL == (*task)) {
QW_UNLOCK(rwType, &sch->tasksLock);
QW_ERR_RET(TSDB_CODE_QRY_TASK_NOT_EXIST);
}
return TSDB_CODE_SUCCESS;
}
int32_t qwAddAcquireTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWSchStatus *sch, int32_t status, SQWTaskStatus **task) {
return qwAddTaskStatusImpl(QW_FPARAMS(), sch, rwType, status, task);
}
......@@ -233,14 +236,30 @@ void qwReleaseTaskStatus(int32_t rwType, SQWSchStatus *sch) {
QW_UNLOCK(rwType, &sch->tasksLock);
}
int32_t qwAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWTaskCtx **ctx) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
QW_LOCK(rwType, &mgmt->ctxLock);
*ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
if (NULL == (*ctx)) {
QW_UNLOCK(rwType, &mgmt->ctxLock);
QW_TASK_ELOG("ctx not in ctxHash, id:%s", id);
QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST);
}
return TSDB_CODE_SUCCESS;
}
int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, int32_t status, SQWTaskCtx **ctx) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
SQWTaskCtx ctx = {0};
SQWTaskCtx nctx = {0};
QW_LOCK(QW_WRITE, &mgmt->ctxLock);
int32_t code = taosHashPut(mgmt->ctxHash, id, sizeof(id), &ctx, sizeof(SQWTaskCtx));
int32_t code = taosHashPut(mgmt->ctxHash, id, sizeof(id), &nctx, sizeof(SQWTaskCtx));
if (0 != code) {
QW_UNLOCK(QW_WRITE, &mgmt->ctxLock);
......@@ -283,20 +302,6 @@ int32_t qwGetTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tI
}
int32_t qwAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWTaskCtx **ctx) {
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
QW_LOCK(rwType, &mgmt->ctxLock);
*ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
if (NULL == (*ctx)) {
QW_UNLOCK(rwType, &mgmt->ctxLock);
QW_TASK_ELOG("ctx not in ctxHash, id:%s", id);
QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST);
}
return TSDB_CODE_SUCCESS;
}
int32_t qwAddAcquireTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int32_t rwType, SQWTaskCtx **ctx) {
return qwAddTaskCtxImpl(QW_FPARAMS(), rwType, 0, ctx);
......@@ -375,7 +380,7 @@ int32_t qwDropTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
QW_TASK_DLOG("task dropped, id:%d", id);
QW_TASK_DLOG("task dropped, id:%s", id);
_return:
......@@ -385,27 +390,13 @@ _return:
QW_RET(code);
}
int32_t qwUpdateTaskCtxHandles(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, qTaskInfo_t taskHandle, DataSinkHandle sinkHandle) {
SQWTaskCtx *ctx = NULL;
QW_ERR_RET(qwAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx));
ctx->taskHandle = taskHandle;
ctx->sinkHandle = sinkHandle;
qwReleaseTaskCtx(QW_READ, mgmt);
return TSDB_CODE_SUCCESS;
}
int32_t qwUpdateTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int8_t status) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
QW_ERR_JRET(qwAcquireTaskStatus(mgmt, QW_READ, sch, qId, tId, &task));
QW_ERR_RET(qwAcquireScheduler(QW_FPARAMS(), QW_READ, &sch));
QW_ERR_JRET(qwAcquireTaskStatus(QW_FPARAMS(), QW_READ, sch, &task));
QW_ERR_JRET(qwSetTaskStatus(QW_FPARAMS(), task, status));
......@@ -430,7 +421,7 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId,
locked = true;
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropping", NULL);
QW_TASK_WLOG("task already dropping, phase:%d", ctx->phase);
QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
}
......@@ -488,7 +479,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
QW_ERR_RET(code);
}
QW_TASK_DLOG("no data in sink and query end", NULL);
QW_TASK_DLOG("no data in sink and query end, phase:%d", ctx->phase);
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED));
......@@ -546,6 +537,8 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
SQWTaskCtx *ctx = NULL;
bool locked = false;
QW_SCH_TASK_DLOG("handle event at phase %d", phase);
switch (phase) {
case QW_PHASE_PRE_QUERY: {
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx));
......@@ -587,6 +580,11 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
locked = true;
ctx->taskHandle = input->taskHandle;
ctx->sinkHandle = input->sinkHandle;
ctx->readyCode = input->code;
assert(!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL));
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
......@@ -610,14 +608,14 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) {
output->needRsp = true;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
output->rspCode = input.code;
output->rspCode = input->code;
}
if (!output->needStop) {
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), input.status));
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), input->status));
}
break;
}
......@@ -626,36 +624,36 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
QW_LOCK(QW_WRITE, &ctx->lock);
locked = true;
locked = true;
ctx->phase = phase;
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task already cancelled", NULL);
QW_TASK_WLOG("task already cancelled, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task is dropping", NULL);
QW_TASK_WLOG("task is dropping, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_DROPPING;
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task is cancelling", NULL);
QW_TASK_WLOG("task is cancelling, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLING;
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
QW_TASK_WLOG("last fetch not finished", NULL);
QW_TASK_WLOG("last fetch not finished, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_DUPLICATTED_OPERATION;
QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
}
if (!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_READY)) {
QW_TASK_ELOG("query rsp are not ready", NULL);
QW_TASK_ELOG("query rsp are not ready, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_MSG_ERROR;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR);
......@@ -670,18 +668,18 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
locked = true;
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task already cancelled", NULL);
QW_TASK_WLOG("task already cancelled, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task is dropping", NULL);
QW_TASK_WLOG("task is dropping, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_DROPPING;
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task is cancelling", NULL);
QW_TASK_WLOG("task is cancelling, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLING;
}
......@@ -722,7 +720,7 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
code = output.rspCode;
if (needStop) {
QW_TASK_DLOG("task need stop", NULL);
QW_TASK_DLOG("task need stop, phase:%d", QW_PHASE_PRE_QUERY);
QW_ERR_JRET(code);
}
......@@ -733,13 +731,13 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
}
qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(node, 0, (struct SSubplan *)plan, &pTaskInfo);
code = qCreateExecTask(qwMsg->node, 0, (struct SSubplan *)plan, &pTaskInfo);
if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x", code);
QW_ERR_JRET(code);
}
QW_ERR_JRET(qwBuildAndSendQueryRsp(pMsg, TSDB_CODE_SUCCESS));
QW_ERR_JRET(qwBuildAndSendQueryRsp(qwMsg->connection, TSDB_CODE_SUCCESS));
queryRsped = true;
......@@ -750,8 +748,6 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
QW_ERR_JRET(code);
}
QW_ERR_JRET(qwUpdateTaskCtxHandles(QW_FPARAMS(), pTaskInfo, sinkHandle));
_return:
if (code) {
......@@ -770,6 +766,8 @@ _return:
}
input.code = rspCode;
input.taskHandle = pTaskInfo;
input.sinkHandle = sinkHandle;
if (TSDB_CODE_SUCCESS != rspCode) {
input.status = JOB_TASK_STATUS_FAILED;
......@@ -786,6 +784,33 @@ _return:
QW_RET(rspCode);
}
int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) {
int32_t code = 0;
SQWTaskCtx *ctx = NULL;
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
if (ctx->phase == QW_PHASE_PRE_QUERY) {
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY);
} else if (ctx->phase == QW_PHASE_POST_QUERY) {
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
QW_ERR_JRET(qwBuildAndSendReadyRsp(qwMsg->connection, ctx->readyCode));
}
_return:
if (ctx) {
QW_UNLOCK(QW_WRITE, &ctx->lock);
qwReleaseTaskCtx(QW_READ, mgmt);
}
QW_RET(code);
}
int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) {
int32_t code = 0;
bool queryRsped = false;
......@@ -804,7 +829,7 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
code = output.rspCode;
if (needStop) {
QW_TASK_DLOG("task need stop", NULL);
QW_TASK_DLOG("task need stop, phase:%d", QW_PHASE_PRE_CQUERY);
QW_ERR_JRET(code);
}
......@@ -819,22 +844,18 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
QW_ERR_JRET(code);
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CQUERY);
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CQUERY);
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
SOutputData sOutput = {0};
QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
if (NULL == rsp) {
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
}
// Note: schedule data sink firstly and will schedule query after it's done
if (sOutput.scheduleJobNo) {
if (sOutput.scheduleJobNo > ctx.sinkId) {
if (sOutput.scheduleJobNo > ctx->sinkId) {
QW_TASK_DLOG("sink need schedule, scheduleJobNo:%d", sOutput.scheduleJobNo);
ctx.sinkId = sOutput.scheduleJobNo;
ctx->sinkId = sOutput.scheduleJobNo;
QW_ERR_JRET(qwBuildAndSendSchSinkMsg(QW_FPARAMS(), qwMsg->connection));
}
} else if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
......@@ -859,14 +880,15 @@ _return:
qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, &output);
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
if (code) {
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwFreeFetchRsp(rsp);
rsp = NULL;
qwBuildAndSendFetchRsp(qwMsg->connection, rsp, 0, code);
} else if (rsp) {
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwBuildAndSendFetchRsp(qwMsg->connection, rsp, dataLen, code);
}
}
......@@ -897,7 +919,7 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
code = output.rspCode;
if (needStop) {
QW_TASK_DLOG("task need stop", NULL);
QW_TASK_DLOG("task need stop, phase:%d", QW_PHASE_PRE_FETCH);
QW_ERR_JRET(code);
}
......@@ -912,13 +934,13 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
// Note: schedule data sink firstly and will schedule query after it's done
if (sOutput.scheduleJobNo) {
if (sOutput.scheduleJobNo > ctx.sinkId) {
if (sOutput.scheduleJobNo > ctx->sinkId) {
QW_TASK_DLOG("sink need schedule, scheduleJobNo:%d", sOutput.scheduleJobNo);
ctx.sinkId = sOutput.scheduleJobNo;
ctx->sinkId = sOutput.scheduleJobNo;
QW_ERR_JRET(qwBuildAndSendSchSinkMsg(QW_FPARAMS(), qwMsg->connection));
}
} else if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
} else if ((!sOutput.queryEnd) && (/* DS_BUF_LOW == sOutput.bufStatus || */ DS_BUF_EMPTY == sOutput.bufStatus)) {
QW_TASK_DLOG("task not end, need to continue, bufStatus:%d", sOutput.bufStatus);
if (!QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CQUERY)) {
......@@ -1037,28 +1059,11 @@ void qWorkerDestroy(void **qWorkerMgmt) {
tfree(*qWorkerMgmt);
}
#if 0
#endif
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SSchedulerStatusRsp **rsp) {
SQWSchStatus *sch = NULL;
int32_t taskNum = 0;
/*
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
sch->lastAccessTs = taosGetTimestampSec();
......@@ -1096,6 +1101,7 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint
qwReleaseScheduler(QW_READ, mgmt);
(*rsp)->num = taskNum;
*/
return TSDB_CODE_SUCCESS;
}
......@@ -1105,12 +1111,13 @@ int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint
int32_t qwUpdateSchLastAccess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId) {
SQWSchStatus *sch = NULL;
/*
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
sch->lastAccessTs = taosGetTimestampSec();
qwReleaseScheduler(QW_READ, mgmt);
*/
return TSDB_CODE_SUCCESS;
}
......@@ -1119,7 +1126,8 @@ int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
/*
if (qwAcquireScheduler(QW_READ, mgmt, sId, &sch)) {
*taskStatus = JOB_TASK_STATUS_NULL;
return TSDB_CODE_SUCCESS;
......@@ -1136,6 +1144,7 @@ int32_t qwGetTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
qwReleaseTask(QW_READ, sch);
qwReleaseScheduler(QW_READ, mgmt);
*/
QW_RET(code);
}
......@@ -1146,6 +1155,7 @@ int32_t qwCancelTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tI
SQWTaskStatus *task = NULL;
int32_t code = 0;
/*
QW_ERR_RET(qwAcquireAddScheduler(QW_READ, mgmt, sId, &sch));
QW_ERR_JRET(qwAcquireAddTask(mgmt, QW_READ, sch, qId, tId, JOB_TASK_STATUS_NOT_START, &task));
......@@ -1193,6 +1203,7 @@ _return:
if (sch) {
qwReleaseScheduler(QW_READ, mgmt);
}
*/
QW_RET(code);
}
......
......@@ -4,6 +4,7 @@
#include "planner.h"
#include "query.h"
#include "qworkerInt.h"
#include "qworkerMsg.h"
#include "tmsg.h"
#include "tname.h"
#include "dataSinkMgt.h"
......@@ -25,7 +26,9 @@ int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp) {
return TSDB_CODE_SUCCESS;
}
void qwBuildFetchRsp(SRetrieveTableRsp *rsp, SOutputData *input, int32_t len) {
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len) {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
rsp->useconds = htobe64(input->useconds);
rsp->completed = input->queryEnd;
rsp->precision = input->precision;
......@@ -262,48 +265,6 @@ int32_t qwBuildAndSendSchSinkMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId,
}
int32_t qwSetAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SRpcMsg *pMsg) {
SQWSchStatus *sch = NULL;
SQWTaskStatus *task = NULL;
int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, sId, &sch));
QW_ERR_JRET(qwAcquireTask(mgmt, QW_READ, sch, qId, tId, &task));
QW_LOCK(QW_WRITE, &task->lock);
int8_t status = task->status;
int32_t errCode = task->code;
if (QW_TASK_READY(status)) {
task->ready = QW_READY_RESPONSED;
QW_UNLOCK(QW_WRITE, &task->lock);
QW_ERR_JRET(qwBuildAndSendReadyRsp(pMsg, errCode));
QW_SCH_TASK_DLOG("task ready responsed, status:%d", status);
} else {
task->ready = QW_READY_RECEIVED;
QW_UNLOCK(QW_WRITE, &task->lock);
QW_SCH_TASK_DLOG("task ready NOT responsed, status:%d", status);
}
_return:
if (task) {
qwReleaseTask(QW_READ, sch);
}
qwReleaseScheduler(QW_READ, mgmt);
QW_RET(code);
}
int32_t qwBuildAndSendCQueryMsg(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, void *connection) {
SRpcMsg *pMsg = (SRpcMsg *)connection;
SQueryContinueReq * req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq));
......@@ -349,7 +310,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
QW_ELOG("invalid query msg, contLen:%d", pMsg->contLen);
QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
msg->sId = be64toh(msg->sId);
......@@ -373,16 +334,16 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQueryContinueReq *msg = (SQueryContinueReq *)pMsg->pCont;
bool needStop = false;
SQWTaskCtx *handles = NULL;
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
QW_ELOG("invalid cquery msg, contLen:%d", pMsg->contLen);
QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
msg->sId = be64toh(msg->sId);
msg->queryId = be64toh(msg->queryId);
msg->taskId = be64toh(msg->taskId);
msg->contentLen = ntohl(msg->contentLen);
uint64_t sId = msg->sId;
uint64_t qId = msg->queryId;
......@@ -423,11 +384,17 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
msg->sId = htobe64(msg->sId);
msg->queryId = htobe64(msg->queryId);
msg->taskId = htobe64(msg->taskId);
msg->sId = be64toh(msg->sId);
msg->queryId = be64toh(msg->queryId);
msg->taskId = be64toh(msg->taskId);
uint64_t sId = msg->sId;
uint64_t qId = msg->queryId;
uint64_t tId = msg->taskId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connection = pMsg};
QW_ERR_RET(qwSetAndSendReadyRsp(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, pMsg));
QW_ERR_RET(qwProcessReady(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId, &qwMsg));
return TSDB_CODE_SUCCESS;
}
......@@ -448,7 +415,7 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SSchedulerStatusRsp *sStatus = NULL;
QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus));
//QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->sId, &sStatus));
_return:
......@@ -469,9 +436,9 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
msg->sId = htobe64(msg->sId);
msg->queryId = htobe64(msg->queryId);
msg->taskId = htobe64(msg->taskId);
msg->sId = be64toh(msg->sId);
msg->queryId = be64toh(msg->queryId);
msg->taskId = be64toh(msg->taskId);
uint64_t sId = msg->sId;
uint64_t qId = msg->queryId;
......@@ -498,7 +465,7 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
msg->queryId = htobe64(msg->queryId);
msg->taskId = htobe64(msg->taskId);
QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
//QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->sId, msg->queryId, msg->taskId));
_return:
......@@ -517,13 +484,13 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
if (NULL == msg || pMsg->contLen < sizeof(*msg)) {
QW_ELOG("invalid task drop msg", NULL);
QW_ELOG("invalid task drop msg, msg:%p, msgLen:%d", msg, pMsg->contLen);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
msg->sId = htobe64(msg->sId);
msg->queryId = htobe64(msg->queryId);
msg->taskId = htobe64(msg->taskId);
msg->sId = be64toh(msg->sId);
msg->queryId = be64toh(msg->queryId);
msg->taskId = be64toh(msg->taskId);
uint64_t sId = msg->sId;
uint64_t qId = msg->queryId;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册