未验证 提交 c5cbd258 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #21367 from taosdata/fix/TD-24167

fix pause agg task
......@@ -1306,7 +1306,7 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndPauseStreamTask(pTrans, pTask) < 0) {
if (pTask->taskLevel != TASK_LEVEL__SINK && mndPauseStreamTask(pTrans, pTask) < 0) {
return -1;
}
}
......@@ -1430,7 +1430,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
if (pTask->taskLevel != TASK_LEVEL__SINK && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
return -1;
}
}
......
......@@ -1449,7 +1449,11 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqStartStreamTasks(pTq);
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
tqStartStreamTasks(pTq);
} else {
streamSchedExec(pTask);
}
}
return 0;
......
......@@ -269,6 +269,9 @@ int32_t streamExecForAll(SStreamTask* pTask) {
qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr);
while (1) {
if (streamTaskShouldPause(&pTask->status)) {
return 0;
}
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) {
if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
......
......@@ -307,4 +307,63 @@ sql resume stream IF EXISTS streams66666666;
print ===== step 4 over
print ===== step5
sql drop stream if exists streams6;
sql drop database if exists test6;
sql create database test6 vgroups 10;
sql use test6;
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create table ts3 using st tags(3,2,2);
sql create table ts4 using st tags(4,2,2);
sql create stream streams6 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt6 as select _wstart, count(*) c1 from st interval(10s);
sql insert into ts1 values(1648791213001,1,12,3,1.0);
sql insert into ts2 values(1648791213001,1,12,3,1.0);
sql insert into ts3 values(1648791213001,1,12,3,1.0);
sql insert into ts4 values(1648791213001,1,12,3,1.0);
sleep 1000
sql pause stream streams6;
sleep 1000
sql insert into ts1 values(1648791223001,1,12,3,1.0);
sql insert into ts2 values(1648791233001,1,12,3,1.0);
sql resume stream streams6;
sql insert into ts3 values(1648791243001,1,12,3,1.0);
sql insert into ts4 values(1648791253001,1,12,3,1.0);
$loop_count = 0
loop6:
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sleep 500
print 2 select * from streamt6;
sql select * from streamt6;
if $rows != 5 then
print =====rows=$rows
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
print $data30 $data31 $data32
print $data40 $data41 $data42
print $data50 $data51 $data52
goto loop6
endi
print ===== step5 over
system sh/stop_dnodes.sh
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册