提交 b79313ec 编写于 作者: 5 54liuyao

pause&resume

上级 d429e070
...@@ -183,6 +183,9 @@ SArray *mmGetMsgHandles() { ...@@ -183,6 +183,9 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
...@@ -538,6 +538,8 @@ SArray *vmGetMsgHandles() { ...@@ -538,6 +538,8 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
...@@ -67,6 +67,8 @@ int32_t mndInitStream(SMnode *pMnode) { ...@@ -67,6 +67,8 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp);
// mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr); // mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
// mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint); // mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint);
......
...@@ -1223,7 +1223,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms ...@@ -1223,7 +1223,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
tqDebug("vgId:%d s-task:%s set normal flag", pTq->pStreamMeta->vgId, pTask->id.idStr); tqDebug("vgId:%d s-task:%s set normal flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
streamSetStatusNormal(pTask); streamSetStatusNormal(pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqStreamTasksScanWal(pTq); tqStartStreamTasks(pTq);
} }
return 0; return 0;
} }
......
...@@ -410,12 +410,12 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -410,12 +410,12 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
} }
} break; } break;
case TDMT_STREAM_TASK_PAUSE: { case TDMT_STREAM_TASK_PAUSE: {
if (tqProcessTaskPauseReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { if (pVnode->restored && tqProcessTaskPauseReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) {
goto _err; goto _err;
} }
} break; } break;
case TDMT_STREAM_TASK_RESUME: { case TDMT_STREAM_TASK_RESUME: {
if (tqProcessTaskResumeReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { if (pVnode->restored && tqProcessTaskResumeReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) {
goto _err; goto _err;
} }
} break; } break;
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print ===== step1
sql drop stream if exists streams1;
sql drop database if exists test;
sql create database test vgroups 10;
sql use test;
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create table ts3 using st tags(3,2,2);
sql create table ts4 using st tags(4,2,2);
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt1 as select _wstart, count(*) c1, sum(a) c3 from st interval(10s);
sql pause stream streams1;
sql insert into ts1 values(1648791213001,1,12,3,1.0);
sql insert into ts2 values(1648791213001,1,12,3,1.0);
sql insert into ts3 values(1648791213001,1,12,3,1.0);
sql insert into ts4 values(1648791213001,1,12,3,1.0);
sleep 1000
print 1 select * from streamt1;
sql select * from streamt1;
if $rows != 0 then
print =1====rows=$rows
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
return -1
endi
sql resume stream streams1;
$loop_count = 0
loop0:
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sleep 500
print 2 select * from streamt1;
sql select * from streamt1;
if $rows != 1 then
print =====rows=$rows
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
goto loop0
endi
if $data01 != 4 then
print =====data01=$data01
goto loop0
endi
sql insert into ts1 values(1648791223002,2,2,3,1.1);
sql insert into ts2 values(1648791223002,3,2,3,2.1);
sql insert into ts3 values(1648791223002,4,2,43,73.1);
sql insert into ts4 values(1648791223002,24,22,23,4.1);
$loop_count = 0
loop1:
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sleep 500
print 3 select * from streamt1;
sql select * from streamt1;
if $rows != 2 then
print =====rows=$rows
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
goto loop1
endi
if $data01 != 4 then
print =====data01=$data01
goto loop1
endi
if $data11 != 4 then
print =====data01=$data01
goto loop1
endi
print ===== step 1 over
print ===== step2
sql drop stream if exists streams2;
sql drop database if exists test2;
sql create database test2 vgroups 1;
sql use test2;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt2 as select _wstart, count(*) c1, sum(a) c3 from t1 interval(10s);
sql pause stream streams2;
sql insert into t1 values(1648791213001,1,12,3,1.0);
sleep 1000
print 1 select * from streamt2;
sql select * from streamt2;
if $rows != 0 then
print =1====rows=$rows
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
return -1
endi
sql resume stream streams2;
$loop_count = 0
loop10:
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sleep 500
print 2 select * from streamt2;
sql select * from streamt2;
if $rows != 1 then
print =====rows=$rows
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
goto loop10
endi
if $data01 != 1 then
print =====data01=$data01
goto loop10
endi
sql insert into t1 values(1648791223002,2,2,3,1.1);
$loop_count = 0
loop2:
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sleep 500
print 3 select * from streamt2;
sql select * from streamt2;
if $rows != 2 then
print =====rows=$rows
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
goto loop2
endi
if $data01 != 1 then
print =====data01=$data01
goto loop2
endi
if $data11 != 1 then
print =====data01=$data01
goto loop2
endi
print ===== step 2 over
print ===== step3
sql drop stream if exists streams3;
sql drop database if exists test3;
sql create database test3 vgroups 10;
sql use test3;
sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create table ts3 using st tags(3,2,2);
sql create table ts4 using st tags(4,2,2);
sql create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt3 as select _wstart, count(*) c1, sum(a) c3 from st interval(10s);
sql create stream streams4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt4 as select _wstart, count(*) c1, sum(a) c3 from st interval(10s);
sql create stream streams5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt5 as select _wstart, count(*) c1, sum(a) c3 from ts1 interval(10s);
sql pause stream streams3;
sql insert into ts1 values(1648791213001,1,12,3,1.0);
sql insert into ts2 values(1648791213001,1,12,3,1.0);
sql insert into ts3 values(1648791213001,1,12,3,1.0);
sql insert into ts4 values(1648791213001,1,12,3,1.0);
$loop_count = 0
loop3:
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sleep 500
print 1 select * from streamt4;
sql select * from streamt4;
if $rows != 1 then
print =====rows=$rows
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
goto loop3
endi
print 2 select * from streamt5;
sql select * from streamt5;
if $rows != 1 then
print =====rows=$rows
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
goto loop3
endi
print 3 select * from streamt3;
sql select * from streamt3;
if $rows != 0 then
print =====rows=$rows
return -1
endi
print ===== step 3 over
print ===== step 4
sql_error pause stream streams3333333;
sql pause stream IF EXISTS streams44444;
sql_error resume stream streams5555555;
sql resume stream IF EXISTS streams66666666;
print ===== step 4 over
system sh/stop_dnodes.sh
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册