提交 1367552f 编写于 作者: H Haojun Liao

fix(stream): refactor the halt status to check more status.

上级 0b6e1a12
...@@ -610,6 +610,9 @@ int32_t streamRestoreParam(SStreamTask* pTask); ...@@ -610,6 +610,9 @@ int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamSetStatusNormal(SStreamTask* pTask); int32_t streamSetStatusNormal(SStreamTask* pTask);
const char* streamGetTaskStatusStr(int32_t status); const char* streamGetTaskStatusStr(int32_t status);
void streamTaskPause(SStreamTask* pTask); void streamTaskPause(SStreamTask* pTask);
void streamTaskResume(SStreamTask* pTask);
void streamTaskHalt(SStreamTask* pTask);
void streamTaskResumeFromHalt(SStreamTask* pTask);
void streamTaskDisablePause(SStreamTask* pTask); void streamTaskDisablePause(SStreamTask* pTask);
void streamTaskEnablePause(SStreamTask* pTask); void streamTaskEnablePause(SStreamTask* pTask);
......
...@@ -1156,12 +1156,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ...@@ -1156,12 +1156,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
taosMsleep(100); taosMsleep(100);
} }
// todo fix the race condition, when pause msg is received from mnode, add lock here streamTaskHalt(pTask);
// now we can stop the stream task execution // now we can stop the stream task execution
ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__NORMAL); // todo upgrade the statu to be HALT from PAUSE or NORMAL
pStreamTask->status.taskStatus = TASK_STATUS__HALT; pStreamTask->status.taskStatus = TASK_STATUS__HALT;
// todo disable the pause
tqDebug("s-task:%s level:%d status is set to halt by fill-history task:%s", pStreamTask->id.idStr, tqDebug("s-task:%s level:%d status is set to halt by fill-history task:%s", pStreamTask->id.idStr,
pStreamTask->info.taskLevel, id); pStreamTask->info.taskLevel, id);
...@@ -1522,11 +1521,14 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, ...@@ -1522,11 +1521,14 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
return -1; return -1;
} }
if (streamTaskShouldPause(&pTask->status)) { // todo: handle the case: resume from halt to pause/ from halt to normal/ from pause to normal
atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus); streamTaskResume(pTask);
int32_t level = pTask->info.taskLevel;
int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY) {
// no lock needs to secure the access of the version // no lock needs to secure the access of the version
if (igUntreated && pTask->info.taskLevel == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) { if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
// discard all the data when the stream task is suspended. // discard all the data when the stream task is suspended.
walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion); walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64 tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
...@@ -1537,9 +1539,9 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, ...@@ -1537,9 +1539,9 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
} }
if (pTask->info.fillHistory && pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory) {
streamStartRecoverTask(pTask, igUntreated); streamStartRecoverTask(pTask, igUntreated);
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) { } else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) {
tqStartStreamTasks(pTq); tqStartStreamTasks(pTq);
} else { } else {
streamSchedExec(pTask); streamSchedExec(pTask);
......
...@@ -345,7 +345,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { ...@@ -345,7 +345,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
double el = (taosGetTimestampMs() - st) / 1000.0; double el = (taosGetTimestampMs() - st) / 1000.0;
if (el > 0) { if (el > 0) {
qDebug("s-task:%s wait for stream task:%s for %.2fs to handle all data in inputQ", pTask->id.idStr, qDebug("s-task:%s wait for stream task:%s for %.2fs to be idle", pTask->id.idStr,
pStreamTask->id.idStr, el); pStreamTask->id.idStr, el);
} }
} }
...@@ -377,13 +377,13 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { ...@@ -377,13 +377,13 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
} else { } else {
ASSERT(status == TASK_STATUS__SCAN_HISTORY); ASSERT(status == TASK_STATUS__SCAN_HISTORY);
pStreamTask->status.taskStatus = TASK_STATUS__HALT; pStreamTask->status.taskStatus = TASK_STATUS__HALT;
qDebug("s-task:%s halt by related fill history task:%s", pStreamTask->id.idStr, pTask->id.idStr); qDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
} }
// wait for the stream task to handle all in the inputQ, and to be idle // wait for the stream task to handle all in the inputQ, and to be idle
waitForTaskIdle(pTask, pStreamTask); waitForTaskIdle(pTask, pStreamTask);
// In case of sink tasks, no need to be halted for them. // In case of sink tasks, no need to halt them.
// In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to // In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to
// start the task state transfer procedure. // start the task state transfer procedure.
// When a task is idle with halt status, all data in inputQ are consumed. // When a task is idle with halt status, all data in inputQ are consumed.
...@@ -405,8 +405,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { ...@@ -405,8 +405,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
streamTaskReleaseState(pTask); streamTaskReleaseState(pTask);
streamTaskReloadState(pStreamTask); streamTaskReloadState(pStreamTask);
// reset the status of stream task streamTaskResumeFromHalt(pStreamTask);
streamSetStatusNormal(pStreamTask);
pTask->status.taskStatus = TASK_STATUS__DROPPING; pTask->status.taskStatus = TASK_STATUS__DROPPING;
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
......
...@@ -774,7 +774,7 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { ...@@ -774,7 +774,7 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
pRange->range.minVer = 0; pRange->range.minVer = 0;
pRange->range.maxVer = ver; pRange->range.maxVer = ver;
qDebug("s-task:%s level:%d related-fill-history task exists, update stream calc time window:%" PRId64 " - %" PRId64 qDebug("s-task:%s level:%d related fill-history task exists, update stream calc time window:%" PRId64 " - %" PRId64
", verRang:%" PRId64 " - %" PRId64, ", verRang:%" PRId64 " - %" PRId64,
pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
pRange->range.maxVer); pRange->range.maxVer);
...@@ -806,6 +806,7 @@ void streamTaskCheckDownstreamTasks(SStreamTask* pTask) { ...@@ -806,6 +806,7 @@ void streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
streamTaskDoCheckDownstreamTasks(pTask); streamTaskDoCheckDownstreamTasks(pTask);
} }
// normal -> pause, pause/stop/dropping -> pause, halt -> pause, scan-history -> pause
void streamTaskPause(SStreamTask* pTask) { void streamTaskPause(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
...@@ -824,11 +825,17 @@ void streamTaskPause(SStreamTask* pTask) { ...@@ -824,11 +825,17 @@ void streamTaskPause(SStreamTask* pTask) {
} }
while(!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) { while(!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) {
if (pTask->status.taskStatus == TASK_STATUS__DROPPING) { status = pTask->status.taskStatus;
if (status == TASK_STATUS__DROPPING) {
qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr); qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
return; return;
} }
if (status == TASK_STATUS__STOP || status == TASK_STATUS__PAUSE) {
qDebug("vgId:%d s-task:%s task already stopped/paused, status:%s, do nothing", pMeta->vgId, pTask->id.idStr, str);
return;
}
qDebug("s-task:%s wait for the task can be paused, vgId:%d", pTask->id.idStr, pMeta->vgId); qDebug("s-task:%s wait for the task can be paused, vgId:%d", pTask->id.idStr, pMeta->vgId);
taosMsleep(100); taosMsleep(100);
} }
...@@ -841,6 +848,17 @@ void streamTaskPause(SStreamTask* pTask) { ...@@ -841,6 +848,17 @@ void streamTaskPause(SStreamTask* pTask) {
streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el); streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el);
} }
void streamTaskResume(SStreamTask* pTask) {
int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__PAUSE) {
pTask->status.taskStatus = pTask->status.keepTaskStatus;
pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
qDebug("s-task:%s resume from pause", pTask->id.idStr);
} else {
qError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
}
}
// todo fix race condition // todo fix race condition
void streamTaskDisablePause(SStreamTask* pTask) { void streamTaskDisablePause(SStreamTask* pTask) {
// pre-condition check // pre-condition check
...@@ -858,3 +876,38 @@ void streamTaskEnablePause(SStreamTask* pTask) { ...@@ -858,3 +876,38 @@ void streamTaskEnablePause(SStreamTask* pTask) {
qDebug("s-task:%s enable task pause", pTask->id.idStr); qDebug("s-task:%s enable task pause", pTask->id.idStr);
pTask->status.pauseAllowed = 1; pTask->status.pauseAllowed = 1;
} }
void streamTaskHalt(SStreamTask* pTask) {
int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
return;
}
if (status == TASK_STATUS__HALT) {
return;
}
// upgrade to halt status
if (status == TASK_STATUS__PAUSE) {
qDebug("s-task:%s upgrade status to %s from %s", pTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT),
streamGetTaskStatusStr(TASK_STATUS__PAUSE));
} else {
qDebug("s-task:%s halt task", pTask->id.idStr);
}
pTask->status.keepTaskStatus = status;
pTask->status.taskStatus = TASK_STATUS__HALT;
}
void streamTaskResumeFromHalt(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
int8_t status = pTask->status.taskStatus;
if (status != TASK_STATUS__HALT) {
qError("s-task:%s not in halt status, status:%s", id, streamGetTaskStatusStr(status));
return;
}
pTask->status.taskStatus = pTask->status.keepTaskStatus;
pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
qDebug("s-task:%s resume from halt, current status:%s", id, streamGetTaskStatusStr(pTask->status.taskStatus));
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册