提交 54e59090 编写于 作者: D dapan1121

feature/qnode

上级 e6f05d38
......@@ -86,8 +86,6 @@ typedef struct SQWPhaseInput {
int8_t taskStatus;
int8_t taskType;
int32_t code;
qTaskInfo_t taskHandle;
DataSinkHandle sinkHandle;
} SQWPhaseInput;
typedef struct SQWPhaseOutput {
......
......@@ -456,13 +456,15 @@ _return:
QW_RET(code);
}
int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkHandle, int8_t taskType, bool shortRun) {
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
int32_t code = 0;
bool qcontinue = true;
SSDataBlock* pRes = NULL;
uint64_t useconds = 0;
int32_t i = 0;
int32_t leftRun = QW_DEFAULT_SHORT_RUN_TIMES;
int32_t execNum = 0;
qTaskInfo_t *taskHandle = &ctx->taskHandle;
DataSinkHandle sinkHandle = ctx->sinkHandle;
while (true) {
QW_TASK_DLOG("start to execTask in executor, loopIdx:%d", i++);
......@@ -473,12 +475,14 @@ int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkH
QW_ERR_JRET(code);
}
++execNum;
if (NULL == pRes) {
QW_TASK_DLOG("task query done, useconds:%"PRIu64, useconds);
dsEndPut(sinkHandle, useconds);
if (TASK_TYPE_TEMP == taskType) {
if (TASK_TYPE_TEMP == ctx->taskType) {
qwFreeTaskHandle(QW_FPARAMS(), taskHandle);
}
......@@ -498,7 +502,11 @@ int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkH
break;
}
if (shortRun && ((--leftRun) <= 0)) {
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY) && execNum >= QW_DEFAULT_SHORT_RUN_TIMES) {
break;
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
break;
}
}
......@@ -617,8 +625,6 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
case QW_PHASE_PRE_QUERY: {
atomic_store_8(&ctx->phase, phase);
atomic_store_8(&ctx->taskType, input->taskType);
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL) || QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_ELOG("task already cancelled/dropped at wrong phase, phase:%d", phase);
......@@ -842,9 +848,6 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
}
if (QW_PHASE_POST_QUERY == phase) {
ctx->taskHandle = input->taskHandle;
ctx->sinkHandle = input->sinkHandle;
if (NULL == ctx->taskHandle && NULL == ctx->sinkHandle) {
ctx->emptyRes = true;
}
......@@ -949,8 +952,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
SQWPhaseOutput output = {0};
qTaskInfo_t pTaskInfo = NULL;
DataSinkHandle sinkHandle = NULL;
input.taskType = taskType;
SQWTaskCtx *ctx = NULL;
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, &output));
......@@ -961,6 +963,10 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
QW_TASK_DLOG("task need stop, phase:%d", QW_PHASE_PRE_QUERY);
QW_ERR_JRET(code);
}
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
atomic_store_8(&ctx->taskType, taskType);
code = qStringToSubplan(qwMsg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) {
......@@ -986,8 +992,11 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
queryRsped = true;
atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
atomic_store_ptr(&ctx->sinkHandle, sinkHandle);
if (pTaskInfo && sinkHandle) {
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &pTaskInfo, sinkHandle, taskType, true));
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx));
}
_return:
......@@ -1002,8 +1011,6 @@ _return:
}
input.code = rspCode;
input.taskHandle = pTaskInfo;
input.sinkHandle = sinkHandle;
input.taskStatus = rspCode ? JOB_TASK_STATUS_FAILED : JOB_TASK_STATUS_PARTIAL_SUCCEED;
QW_ERR_RET(qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, &output));
......@@ -1095,7 +1102,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
DataSinkHandle sinkHandle = ctx->sinkHandle;
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &ctx->taskHandle, sinkHandle, ctx->taskType, QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)));
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx));
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
SOutputData sOutput = {0};
......
......@@ -722,7 +722,7 @@ void *queryQueueThread(void *param) {
while (true) {
tsem_wait(&qwtTestQuerySem);
if (qwtTestStop && qwtTestQueryQueueNum <= 0) {
if (qwtTestStop && qwtTestQueryQueueNum <= 0 && qwtTestCaseFinished) {
break;
}
......@@ -761,7 +761,7 @@ void *queryQueueThread(void *param) {
free(queryRpc);
if (qwtTestStop && qwtTestQueryQueueNum <= 0) {
if (qwtTestStop && qwtTestQueryQueueNum <= 0 && qwtTestCaseFinished) {
break;
}
}
......@@ -779,6 +779,10 @@ void *fetchQueueThread(void *param) {
while (true) {
tsem_wait(&qwtTestFetchSem);
if (qwtTestStop && qwtTestFetchQueueNum <= 0 && qwtTestCaseFinished) {
break;
}
taosWLockLatch(&qwtTestFetchQueueLock);
if (qwtTestFetchQueueNum <= 0 || qwtTestFetchQueueRIdx == qwtTestFetchQueueWIdx) {
printf("Fetch queue is empty\n");
......@@ -826,7 +830,7 @@ void *fetchQueueThread(void *param) {
free(fetchRpc);
if (qwtTestStop && qwtTestFetchQueueNum <= 0) {
if (qwtTestStop && qwtTestFetchQueueNum <= 0 && qwtTestCaseFinished) {
break;
}
}
......@@ -1104,10 +1108,17 @@ TEST(rcTest, shortExecshortDelay) {
break;
}
sleep(3);
sleep(1);
if (qwtTestCaseFinished) {
if (qwtTestQuitThreadNum < 3) {
tsem_post(&qwtTestQuerySem);
tsem_post(&qwtTestFetchSem);
usleep(10);
}
}
tsem_post(&qwtTestQuerySem);
usleep(10);
}
qwtTestQueryQueueNum = 0;
......@@ -1179,10 +1190,17 @@ TEST(rcTest, longExecshortDelay) {
break;
}
sleep(3);
sleep(1);
if (qwtTestCaseFinished) {
if (qwtTestQuitThreadNum < 3) {
tsem_post(&qwtTestQuerySem);
tsem_post(&qwtTestFetchSem);
usleep(10);
}
}
tsem_post(&qwtTestQuerySem);
usleep(10);
}
qwtTestQueryQueueNum = 0;
......@@ -1255,10 +1273,17 @@ TEST(rcTest, shortExeclongDelay) {
break;
}
sleep(3);
sleep(1);
if (qwtTestCaseFinished) {
if (qwtTestQuitThreadNum < 3) {
tsem_post(&qwtTestQuerySem);
tsem_post(&qwtTestFetchSem);
usleep(10);
}
}
tsem_post(&qwtTestQuerySem);
usleep(10);
}
qwtTestQueryQueueNum = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册