提交 53377c2c 编写于 作者: H Haojun Liao

fix(stream): wait for stream task completed.

上级 9a3708e1
......@@ -44,9 +44,8 @@ enum {
TASK_STATUS__DROPPING,
TASK_STATUS__FAIL,
TASK_STATUS__STOP,
TASK_STATUS__WAIT_DOWNSTREAM,
TASK_STATUS__SCAN_HISTORY,
TASK_STATUS__HALT, // stream task halt to wait for the secondary scan history, this status is invisible for user
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused
TASK_STATUS__PAUSE,
};
......@@ -565,6 +564,7 @@ int32_t streamSchedExec(SStreamTask* pTask);
int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
bool streamTaskShouldStop(const SStreamStatus* pStatus);
bool streamTaskShouldPause(const SStreamStatus* pStatus);
bool streamTaskIsIdle(const SStreamTask* pTask);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
......
......@@ -1121,8 +1121,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
taosMsleep(100);
}
taosMsleep(10000);
// now we can stop the stream task execution
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr,
......@@ -1378,12 +1376,13 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask != NULL) {
if (pTask->status.taskStatus == TASK_STATUS__NORMAL) {
tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId, pTask->id.idStr,
int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT) {
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.version);
streamProcessRunReq(pTask);
} else {
if (streamTaskShouldPause(&pTask->status) || (pTask->status.taskStatus == TASK_STATUS__HALT)) {
if (streamTaskShouldPause(&pTask->status)) {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
}
......
......@@ -189,6 +189,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
if (qExecTask(exec, &output, &ts) < 0) {
continue;
}
if (output == NULL) {
if (qStreamRecoverScanFinished(exec)) {
finished = true;
......@@ -396,16 +397,30 @@ int32_t streamExecForAll(SStreamTask* pTask) {
ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
// here we need to wait for the stream task handle all data in the input queue.
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__HALT);
} else {
ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__NORMAL);
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
}
{// wait for the stream task to be idle
while(!streamTaskIsIdle(pStreamTask)) {
qDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", pTask->id.idStr,
pTask->info.taskLevel, pStreamTask->id.idStr);
taosMsleep(100);
}
}
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// update the scan data range for source task.
qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " transfer to %" PRId64 " - %" PRId64
", status:%s, sched-status:%d",
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus);
} else {
// for agg task and sink task, they are continue to execute, no need to be halt.
// for sink tasks, they are continue to execute, no need to be halt.
// the process should be stopped for a while, during the term of transfer task state.
// OR wait for the inputQ && outputQ of agg tasks are all consumed, and then start the state transfer
......@@ -413,12 +428,13 @@ int32_t streamExecForAll(SStreamTask* pTask) {
qDebug("s-task:%s no need to update time window, for non-source task", pStreamTask->id.idStr);
}
// expand the query time window for stream scanner
pTimeWindow->skey = INT64_MIN;
streamSetStatusNormal(pStreamTask);
streamMetaSaveTask(pTask->pMeta, pStreamTask);
if (streamMetaCommit(pTask->pMeta)) {
// persistent to disk for
// persistent to disk
}
streamSchedExec(pStreamTask);
......@@ -481,12 +497,32 @@ int32_t streamExecForAll(SStreamTask* pTask) {
double el = (taosGetTimestampMs() - st) / 1000.0;
qDebug("s-task:%s batch of (%d)input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
id, batchSize, el, resSize / 1048576.0, totalBlocks);
streamFreeQitem(pInput);
}
return 0;
}
bool streamTaskIsIdle(const SStreamTask* pTask) {
int32_t numOfItems = taosQueueItemSize(pTask->inputQueue->queue);
if (numOfItems > 0) {
return false;
}
numOfItems = taosQallItemSize(pTask->inputQueue->qall);
if (numOfItems > 0) {
return false;
}
// blocked by downstream task
if (pTask->outputStatus == TASK_OUTPUT_STATUS__BLOCKED) {
return false;
}
return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE);
}
int32_t streamTryExec(SStreamTask* pTask) {
// this function may be executed by multi-threads, so status check is required.
int8_t schedStatus =
......
......@@ -40,7 +40,6 @@ int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated) {
const char* streamGetTaskStatusStr(int32_t status) {
switch(status) {
case TASK_STATUS__NORMAL: return "normal";
case TASK_STATUS__WAIT_DOWNSTREAM: return "wait-for-downstream";
case TASK_STATUS__SCAN_HISTORY: return "scan-history";
case TASK_STATUS__HALT: return "halt";
case TASK_STATUS__PAUSE: return "paused";
......@@ -217,18 +216,17 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
return -1;
}
// set the downstream tasks have been checked flag
ASSERT(pTask->status.checkDownstream == 0);
pTask->status.checkDownstream = 1;
ASSERT(pTask->status.taskStatus != TASK_STATUS__HALT);
ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY || pTask->status.taskStatus == TASK_STATUS__NORMAL);
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
qDebug("s-task:%s fixed downstream task is ready, now enter into scan-history-data stage, status:%s", id,
streamGetTaskStatusStr(pTask->status.taskStatus));
streamTaskLaunchScanHistory(pTask);
} else {
ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL);
qDebug("s-task:%s fixed downstream task is ready, now ready for data from wal, status:%s", id,
qDebug("s-task:%s fixed downstream task is ready, ready for data from inputQ, status:%s", id,
streamGetTaskStatusStr(pTask->status.taskStatus));
}
} else {
......@@ -396,15 +394,17 @@ int32_t streamAggRecoverPrepare(SStreamTask* pTask) {
return 0;
}
int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) {
int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
if (qRestoreStreamOperatorOption(exec) < 0) {
return -1;
}
if (qStreamRecoverFinish(exec) < 0) {
return -1;
}
streamSetStatusNormal(pTask);
// streamSetStatusNormal(pTask);
return 0;
}
......@@ -414,8 +414,9 @@ int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) {
ASSERT(left >= 0);
if (left == 0) {
qDebug("s-task:%s all %d upstream tasks finish scan-history data", pTask->id.idStr, left);
streamAggChildrenRecoverFinish(pTask);
int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamEpInfoList);
qDebug("s-task:%s all %d upstream tasks finish scan-history data", pTask->id.idStr, numOfTasks);
streamAggUpstreamScanHistoryFinish(pTask);
} else {
qDebug("s-task:%s remain unfinished upstream tasks:%d", pTask->id.idStr, left);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册