提交 d8a1f15d 编写于 作者: L liuyao

fix resum bug

上级 2fe494c2
...@@ -279,6 +279,7 @@ typedef struct SCheckpointInfo { ...@@ -279,6 +279,7 @@ typedef struct SCheckpointInfo {
typedef struct SStreamStatus { typedef struct SStreamStatus {
int8_t taskStatus; int8_t taskStatus;
int8_t schedStatus; int8_t schedStatus;
int8_t keepTaskStatus;
} SStreamStatus; } SStreamStatus;
struct SStreamTask { struct SStreamTask {
...@@ -539,6 +540,7 @@ int32_t streamTryExec(SStreamTask* pTask); ...@@ -539,6 +540,7 @@ int32_t streamTryExec(SStreamTask* pTask);
int32_t streamSchedExec(SStreamTask* pTask); int32_t streamSchedExec(SStreamTask* pTask);
int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock); int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock);
bool streamTaskShouldStop(const SStreamStatus* pStatus); bool streamTaskShouldStop(const SStreamStatus* pStatus);
bool streamTaskShouldPause(const SStreamStatus* pStatus);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
......
...@@ -1221,6 +1221,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ...@@ -1221,6 +1221,7 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
if (pTask) { if (pTask) {
tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr); tqDebug("vgId:%d s-task:%s set pause flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
} }
...@@ -1231,7 +1232,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms ...@@ -1231,7 +1232,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
if (pTask) { if (pTask) {
streamSetStatusNormal(pTask); atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
// no lock needs to secure the access of the version // no lock needs to secure the access of the version
if (pReq->igUntreated) { // discard all the data when the stream task is suspended. if (pReq->igUntreated) { // discard all the data when the stream task is suspended.
......
...@@ -106,7 +106,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { ...@@ -106,7 +106,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
} }
if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__RECOVER_PREPARE || if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__RECOVER_PREPARE ||
status == TASK_STATUS__WAIT_DOWNSTREAM || status == TASK_STATUS__PAUSE) { status == TASK_STATUS__WAIT_DOWNSTREAM || streamTaskShouldPause(&pTask->status)) {
tqDebug("s-task:%s not ready for new submit block from wal, status:%d", pTask->id.idStr, status); tqDebug("s-task:%s not ready for new submit block from wal, status:%d", pTask->id.idStr, status);
streamMetaReleaseTask(pStreamMeta, pTask); streamMetaReleaseTask(pStreamMeta, pTask);
continue; continue;
......
...@@ -52,7 +52,7 @@ void streamCleanUp() { ...@@ -52,7 +52,7 @@ void streamCleanUp() {
void streamSchedByTimer(void* param, void* tmrId) { void streamSchedByTimer(void* param, void* tmrId) {
SStreamTask* pTask = (void*)param; SStreamTask* pTask = (void*)param;
if (streamTaskShouldStop(&pTask->status)) { if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
streamMetaReleaseTask(NULL, pTask); streamMetaReleaseTask(NULL, pTask);
return; return;
} }
......
...@@ -22,6 +22,11 @@ bool streamTaskShouldStop(const SStreamStatus* pStatus) { ...@@ -22,6 +22,11 @@ bool streamTaskShouldStop(const SStreamStatus* pStatus) {
return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING); return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING);
} }
bool streamTaskShouldPause(const SStreamStatus* pStatus) {
int32_t status = atomic_load_8((int8_t*) &pStatus->taskStatus);
return (status == TASK_STATUS__PAUSE);
}
static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) { static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
void* pExecutor = pTask->exec.pExecutor; void* pExecutor = pTask->exec.pExecutor;
...@@ -141,7 +146,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ...@@ -141,7 +146,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
int32_t batchCnt = 0; int32_t batchCnt = 0;
while (1) { while (1) {
if (streamTaskShouldStop(&pTask->status)) { if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) {
taosArrayDestroy(pRes); taosArrayDestroy(pRes);
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册