提交 50bbb874 编写于 作者: D dapan1121

feature/qnode

上级 e4bda74d
......@@ -439,52 +439,6 @@ _return:
QW_RET(code);
}
int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, bool *needRsp) {
int32_t code = 0;
SQWTaskCtx *ctx = NULL;
bool locked = false;
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), QW_WRITE, &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
locked = true;
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropping, phase:%d", ctx->phase);
QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
}
if (QW_IN_EXECUTOR(ctx)) {
QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING));
} else if (ctx->phase > 0) {
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS()));
locked = false;
*needRsp = true;
}
if (locked) {
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
}
_return:
if (code) {
QW_SET_RSP_CODE(ctx, code);
}
if (locked) {
QW_UNLOCK(QW_WRITE, &ctx->lock);
}
QW_RET(code);
}
int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkHandle, int8_t taskType) {
int32_t code = 0;
bool qcontinue = true;
......@@ -619,11 +573,12 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ
int8_t status = 0;
SQWTaskCtx *ctx = NULL;
bool locked = false;
bool ctxAcquired = false;
void *readyConnection = NULL;
void *dropConnection = NULL;
void *cancelConnection = NULL;
QW_SCH_TASK_DLOG("handle event at phase %d", phase);
QW_SCH_TASK_DLOG("start to handle event at phase %d", phase);
switch (phase) {
case QW_PHASE_PRE_QUERY: {
......@@ -716,6 +671,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ
}
case QW_PHASE_PRE_FETCH: {
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx));
ctxAcquired = true;
QW_LOCK(QW_WRITE, &ctx->lock);
......@@ -816,6 +772,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQ
}
case QW_PHASE_PRE_CQUERY: {
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx));
ctxAcquired = true;
QW_LOCK(QW_WRITE, &ctx->lock);
......@@ -914,6 +871,10 @@ _return:
QW_UNLOCK(QW_WRITE, &ctx->lock);
}
if (ctxAcquired && ctx) {
qwReleaseTaskCtx(QW_READ, mgmt);
}
if (readyConnection) {
qwBuildAndSendReadyRsp(readyConnection, output->rspCode);
QW_TASK_DLOG("ready msg rsped, code:%x", output->rspCode);
......@@ -929,6 +890,7 @@ _return:
QW_TASK_DLOG("cancel msg rsped, code:%x", output->rspCode);
}
QW_SCH_TASK_DLOG("end to handle event at phase %d", phase);
QW_RET(code);
}
......@@ -1036,6 +998,7 @@ int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
if (phase == QW_PHASE_PRE_QUERY) {
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY);
ctx->readyConnection = qwMsg->connection;
QW_TASK_DLOG("ready msg not rsped, phase:%d", phase);
} else if (phase == QW_PHASE_POST_QUERY) {
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
......@@ -1225,16 +1188,59 @@ _return:
int32_t qwProcessDrop(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) {
int32_t code = 0;
bool needRsp = false;
SQWTaskCtx *ctx = NULL;
bool locked = false;
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), QW_WRITE, &ctx));
QW_ERR_JRET(qwDropTask(QW_FPARAMS(), &needRsp));
QW_LOCK(QW_WRITE, &ctx->lock);
locked = true;
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task already dropping, phase:%d", ctx->phase);
QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION);
}
if (QW_IN_EXECUTOR(ctx)) {
QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx));
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING));
ctx->dropConnection = qwMsg->connection;
} else if (ctx->phase > 0) {
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS()));
locked = false;
needRsp = true;
}
if (!needRsp) {
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
}
_return:
if (code) {
QW_SET_RSP_CODE(ctx, code);
}
if (locked) {
QW_UNLOCK(QW_WRITE, &ctx->lock);
}
if (ctx) {
qwReleaseTaskCtx(QW_WRITE, mgmt);
}
if (TSDB_CODE_SUCCESS != code || needRsp) {
QW_ERR_RET(qwBuildAndSendDropRsp(qwMsg->connection, code));
QW_TASK_DLOG("drop msg rsped, code:%x", code);
}
return TSDB_CODE_SUCCESS;
QW_RET(code);
}
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, putReqToQueryQFp fp) {
......
......@@ -129,17 +129,17 @@ int32_t qwtCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTas
int32_t idx = qwtTestCaseIdx % qwtTestCaseNum;
if (0 == idx) {
*pTaskInfo = qwtTestCaseIdx;
*handle = qwtTestCaseIdx+1;
*pTaskInfo = (qTaskInfo_t)qwtTestCaseIdx;
*handle = (DataSinkHandle)qwtTestCaseIdx+1;
} else if (1 == idx) {
*pTaskInfo = NULL;
*handle = NULL;
} else if (2 == idx) {
*pTaskInfo = qwtTestCaseIdx;
*pTaskInfo = (qTaskInfo_t)qwtTestCaseIdx;
*handle = NULL;
} else if (3 == idx) {
*pTaskInfo = NULL;
*handle = qwtTestCaseIdx;
*handle = (DataSinkHandle)qwtTestCaseIdx;
}
++qwtTestCaseIdx;
......@@ -460,6 +460,14 @@ void *controlThread(void *param) {
return NULL;
}
void *queryQueueThread(void *param) {
}
void *fetchQueueThread(void *param) {
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册