diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 68b8dd72019df1ed623170249272e3cda3c5ad1a..0713150b486d953ccb42eb6acd5e907251d268d6 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1306,7 +1306,7 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) { int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndPauseStreamTask(pTrans, pTask) < 0) { + if (pTask->taskLevel != TASK_LEVEL__SINK && mndPauseStreamTask(pTrans, pTask) < 0) { return -1; } } @@ -1430,7 +1430,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) { + if (pTask->taskLevel != TASK_LEVEL__SINK && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) { return -1; } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 017948001190bd9d6f5f009dc1ae9d181bb2c59c..e61cc2fdb143b41614aeba70d392e85e560a0539 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1449,7 +1449,11 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms } streamMetaReleaseTask(pTq->pStreamMeta, pTask); - tqStartStreamTasks(pTq); + if (pTask->taskLevel == TASK_LEVEL__SOURCE) { + tqStartStreamTasks(pTq); + } else { + streamSchedExec(pTask); + } } return 0; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e05660da3268f6075e8c17344d3d16bd33341462..f64b24ed7e78bbd0ef0f3f9709e4e609f6cb7b5e 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -269,6 +269,9 @@ int32_t streamExecForAll(SStreamTask* pTask) { qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr); while (1) { + if (streamTaskShouldPause(&pTask->status)) { + return 0; + } SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); if (qItem == NULL) { if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) { diff --git a/tests/script/tsim/stream/pauseAndResume.sim b/tests/script/tsim/stream/pauseAndResume.sim index fa7be19310a3a673b0330b9fa7b0849ae5a6714d..402e0086f7d3797eb66a7da89b92b97885e147b5 100644 --- a/tests/script/tsim/stream/pauseAndResume.sim +++ b/tests/script/tsim/stream/pauseAndResume.sim @@ -307,4 +307,63 @@ sql resume stream IF EXISTS streams66666666; print ===== step 4 over +print ===== step5 +sql drop stream if exists streams6; +sql drop database if exists test6; +sql create database test6 vgroups 10; +sql use test6; +sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,1,1); +sql create table ts2 using st tags(2,2,2); +sql create table ts3 using st tags(3,2,2); +sql create table ts4 using st tags(4,2,2); +sql create stream streams6 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt6 as select _wstart, count(*) c1 from st interval(10s); + +sql insert into ts1 values(1648791213001,1,12,3,1.0); +sql insert into ts2 values(1648791213001,1,12,3,1.0); + +sql insert into ts3 values(1648791213001,1,12,3,1.0); +sql insert into ts4 values(1648791213001,1,12,3,1.0); + +sleep 1000 + +sql pause stream streams6; + +sleep 1000 + + +sql insert into ts1 values(1648791223001,1,12,3,1.0); +sql insert into ts2 values(1648791233001,1,12,3,1.0); + +sql resume stream streams6; + +sql insert into ts3 values(1648791243001,1,12,3,1.0); +sql insert into ts4 values(1648791253001,1,12,3,1.0); + +$loop_count = 0 +loop6: + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sleep 500 + +print 2 select * from streamt6; +sql select * from streamt6; + +if $rows != 5 then + print =====rows=$rows + print $data00 $data01 $data02 + print $data10 $data11 $data12 + print $data20 $data21 $data22 + print $data30 $data31 $data32 + print $data40 $data41 $data42 + print $data50 $data51 $data52 + goto loop6 +endi + +print ===== step5 over + system sh/stop_dnodes.sh