提交 641c707b 编写于 作者: L liuyao

pause&resume

上级 5bd8fe54
......@@ -52,6 +52,7 @@ enum {
TASK_STATUS__RECOVER1,
TASK_STATUS__RECOVER2,
TASK_STATUS__RESTORE, // only available for source task to replay WAL from the checkpoint
TASK_STATUS__PAUSE,
};
enum {
......
......@@ -1289,7 +1289,7 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
action.pCont = pReq;
action.contLen = sizeof(SVPauseStreamTaskReq);
action.msgType = TDMT_STREAM_TASK_RESUME;
action.msgType = TDMT_STREAM_TASK_PAUSE;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
......
......@@ -1207,13 +1207,24 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgL
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
// todo(liuyao) call task api
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
if (pTask) {
tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
}
return 0;
}
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
// todo(liuyao) call task api
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
if (pTask) {
tqDebug("vgId:%d s-task:%s set normal flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
streamSetStatusNormal(pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqStreamTasksScanWal(pTq);
}
return 0;
}
......
......@@ -106,7 +106,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
}
if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__RECOVER_PREPARE ||
status == TASK_STATUS__WAIT_DOWNSTREAM) {
status == TASK_STATUS__WAIT_DOWNSTREAM || status == TASK_STATUS__PAUSE) {
tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, status);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
......
......@@ -76,6 +76,21 @@ sql select * from streamt5 order by group_id, ts;
if $rows != 14 then
print ====streamt5=rows5=$rows
print $data00,$data01,$data02,$data03
print $data10,$data11,$data12,$data13
print $data20,$data21,$data22,$data23
print $data30,$data31,$data32,$data33
print $data40,$data41,$data42,$data43
print $data50,$data51,$data52,$data53
print $data60,$data61,$data62,$data63
print $data70,$data71,$data72,$data73
print $data80,$data81,$data82,$data83
print $data90,$data91,$data92,$data93
print $data[10][0],$data[10][1],$data[10][2],$data[10][3]
print $data[11][0],$data[11][1],$data[10][2],$data[10][3]
print $data[12][0],$data[12][1],$data[10][2],$data[10][3]
print $data[13][0],$data[13][1],$data[10][2],$data[10][3]
print $data[14][0],$data[14][1],$data[10][2],$data[10][3]
goto loop2
endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册