提交 936afeb5 编写于 作者: H Haojun Liao

refactor: do internal refactor to simple the pause/resume in case of fill history exists.

上级 d5974a8f
......@@ -45,7 +45,6 @@ enum {
TASK_STATUS__FAIL,
TASK_STATUS__STOP,
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
// TASK_STATUS__SCAN_HISTORY_WAL, // scan history data in wal
TASK_STATUS__HALT, // pause, but not be manipulated by user command
TASK_STATUS__PAUSE, // pause
};
......
......@@ -1092,7 +1092,13 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s start scan-history stage(step 1), status:%s", id, pStatus);
if (pTask->tsInfo.step1Start == 0) {
ASSERT(pTask->status.pauseAllowed == false);
pTask->tsInfo.step1Start = taosGetTimestampMs();
if (pTask->info.fillHistory == 1) {
streamTaskEnablePause(pTask);
}
} else {
tqDebug("s-task:%s resume from paused, start ts:%"PRId64, pTask->id.idStr, pTask->tsInfo.step1Start);
}
// we have to continue retrying to successfully execute the scan history task.
......@@ -1106,14 +1112,13 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return 0;
}
ASSERT(pTask->status.pauseAllowed == false);
if (pTask->info.fillHistory == 1) {
streamTaskEnablePause(pTask);
ASSERT(pTask->status.pauseAllowed == true);
}
streamSourceScanHistoryData(pTask);
if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
double el = taosGetTimestampMs() - pTask->tsInfo.step1Start;
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el,
TASK_SCHED_STATUS__INACTIVE);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
......@@ -1129,26 +1134,24 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
SStreamTask* pStreamTask = NULL;
bool done = false;
// if (!pReq->igUntreated && !streamTaskRecoverScanStep1Finished(pTask)) {
// 1. stop the related stream task, get the current scan wal version of stream task, ver.
// 1. get the related stream task
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) {
// todo delete this task, if the related stream task is dropped
qError("failed to find s-task:0x%x, it may have been destroyed, drop fill-history task:%s",
pTask->streamTaskId.taskId, pTask->id.idStr);
pTask->status.taskStatus = TASK_STATUS__DROPPING;
tqDebug("s-task:%s fill-history task set status to be dropping", id);
streamMetaSaveTask(pMeta, pTask);
streamMetaUnregisterTask(pMeta, pTask->id.taskId);
streamMetaReleaseTask(pMeta, pTask);
return -1;
}
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
// stream task in TASK_STATUS__SCAN_HISTORY can not be paused.
// wait for the stream task get ready for scan history data
// 2. it cannot be paused, when the stream task in TASK_STATUS__SCAN_HISTORY status. Let's wait for the
// stream task get ready for scan history data
while (pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
tqDebug(
"s-task:%s level:%d related stream task:%s(status:%s) not ready for halt, wait for it and recheck in 100ms",
......@@ -1158,6 +1161,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// now we can stop the stream task execution
streamTaskHalt(pStreamTask);
tqDebug("s-task:%s level:%d sched-status:%d is halt by fill-history task:%s", pStreamTask->id.idStr,
pStreamTask->info.taskLevel, pStreamTask->status.schedStatus, id);
......
......@@ -404,6 +404,8 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
streamTaskReleaseState(pTask);
streamTaskReloadState(pStreamTask);
// clear the link between fill-history task and stream task info
pStreamTask->historyTaskId.taskId = 0;
streamTaskResumeFromHalt(pStreamTask);
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
......@@ -414,6 +416,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
// save to disk
taosWLockLatch(&pMeta->lock);
streamMetaSaveTask(pMeta, pStreamTask);
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
......
......@@ -832,7 +832,7 @@ void streamTaskPause(SStreamTask* pTask) {
return;
}
while(!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) {
while (!pTask->status.pauseAllowed || (pTask->status.taskStatus == TASK_STATUS__HALT)) {
status = pTask->status.taskStatus;
if (status == TASK_STATUS__DROPPING) {
qDebug("vgId:%d s-task:%s task already dropped, do nothing", pMeta->vgId, pTask->id.idStr);
......@@ -849,8 +849,19 @@ void streamTaskPause(SStreamTask* pTask) {
taosMsleep(100);
}
// todo: use the lock of the task.
taosWLockLatch(&pMeta->lock);
status = pTask->status.taskStatus;
if (status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP) {
taosWUnLockLatch(&pMeta->lock);
qDebug("vgId:%d s-task:%s task already dropped/stopped/paused, do nothing", pMeta->vgId, pTask->id.idStr);
return;
}
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
taosWUnLockLatch(&pMeta->lock);
int64_t el = taosGetTimestampMs() - st;
qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册