提交 48d85c27 编写于 作者: H Haojun Liao

Merge branch '3.0' into feature/3_liaohj

...@@ -66,7 +66,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { ...@@ -66,7 +66,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
int32_t code = 0; int32_t code = 0;
bool qcontinue = true; bool qcontinue = true;
SSDataBlock *pRes = NULL; SSDataBlock *pRes = NULL;
...@@ -104,8 +104,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { ...@@ -104,8 +104,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx)); QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
if (queryEnd) { if (queryStop) {
*queryEnd = true; *queryStop = true;
} }
break; break;
...@@ -125,6 +125,10 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { ...@@ -125,6 +125,10 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", rows, qcontinue); QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", rows, qcontinue);
if (!qcontinue) { if (!qcontinue) {
if (queryStop) {
*queryStop = true;
}
break; break;
} }
...@@ -566,7 +570,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -566,7 +570,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
SQWPhaseInput input = {0}; SQWPhaseInput input = {0};
void *rsp = NULL; void *rsp = NULL;
int32_t dataLen = 0; int32_t dataLen = 0;
bool queryEnd = false; bool queryStop = false;
do { do {
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL)); QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL));
...@@ -576,7 +580,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -576,7 +580,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
atomic_store_8((int8_t *)&ctx->queryInQueue, 0); atomic_store_8((int8_t *)&ctx->queryInQueue, 0);
atomic_store_8((int8_t *)&ctx->queryContinue, 0); atomic_store_8((int8_t *)&ctx->queryContinue, 0);
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryEnd)); QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryStop));
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
SOutputData sOutput = {0}; SOutputData sOutput = {0};
...@@ -627,7 +631,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { ...@@ -627,7 +631,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
} }
QW_LOCK(QW_WRITE, &ctx->lock); QW_LOCK(QW_WRITE, &ctx->lock);
if (queryEnd || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) { if (queryStop || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) {
// Note: query is not running anymore // Note: query is not running anymore
QW_SET_PHASE(ctx, 0); QW_SET_PHASE(ctx, 0);
QW_UNLOCK(QW_WRITE, &ctx->lock); QW_UNLOCK(QW_WRITE, &ctx->lock);
......
...@@ -224,7 +224,7 @@ ...@@ -224,7 +224,7 @@
# ---- stream # ---- stream
./test.sh -f tsim/stream/basic0.sim ./test.sh -f tsim/stream/basic0.sim
#./test.sh -f tsim/stream/basic1.sim ./test.sh -f tsim/stream/basic1.sim
./test.sh -f tsim/stream/basic2.sim ./test.sh -f tsim/stream/basic2.sim
./test.sh -f tsim/stream/drop_stream.sim ./test.sh -f tsim/stream/drop_stream.sim
./test.sh -f tsim/stream/distributeInterval0.sim ./test.sh -f tsim/stream/distributeInterval0.sim
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册