提交 d72580da 编写于 作者: H Haojun Liao

fix(stream): transfer the state for agg tasks.

上级 d45596bb
...@@ -292,12 +292,12 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { ...@@ -292,12 +292,12 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
} }
} }
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) { if (pStreamTask == NULL) {
// todo: destroy this task here // todo: destroy the fill-history task here
qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr, qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr,
pTask->streamTaskId.taskId); pTask->streamTaskId.taskId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST; return TSDB_CODE_STREAM_TASK_NOT_EXIST;
...@@ -338,34 +338,36 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { ...@@ -338,34 +338,36 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr); qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr);
} }
// expand the query time window for stream scanner // 1. expand the query time window for stream task of WAL scanner
pTimeWindow->skey = INT64_MIN; pTimeWindow->skey = INT64_MIN;
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor); qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
// transfer the ownership of executor state // 2. transfer the ownership of executor state
streamTaskReleaseState(pTask); streamTaskReleaseState(pTask);
streamTaskReloadState(pStreamTask); streamTaskReloadState(pStreamTask);
// clear the link between fill-history task and stream task info // 3. clear the link between fill-history task and stream task info
pStreamTask->historyTaskId.taskId = 0; pStreamTask->historyTaskId.taskId = 0;
// 4. resume the state of stream task, after this function, the stream task will run immidately. But it can not be
// pause, since the pause allowed attribute is not set yet.
streamTaskResumeFromHalt(pStreamTask); streamTaskResumeFromHalt(pStreamTask);
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
int32_t taskId = pTask->id.taskId; int32_t taskId = pTask->id.taskId;
// free it and remove it from disk meta-store // 5. free it and remove fill-history task from disk meta-store
streamMetaUnregisterTask(pMeta, taskId); streamMetaUnregisterTask(pMeta, taskId);
// save to disk // 6. save to disk
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
streamMetaSaveTask(pMeta, pStreamTask); streamMetaSaveTask(pMeta, pStreamTask);
if (streamMetaCommit(pMeta) < 0) { if (streamMetaCommit(pMeta) < 0) {
// persist to disk // persist to disk
} }
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
// pause allowed // 7. pause allowed.
streamTaskEnablePause(pStreamTask); streamTaskEnablePause(pStreamTask);
streamSchedExec(pStreamTask); streamSchedExec(pStreamTask);
...@@ -373,6 +375,25 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { ...@@ -373,6 +375,25 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
int32_t code = TSDB_CODE_SUCCESS;
if (!pTask->status.transferState) {
return code;
}
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
streamTaskFillHistoryFinished(pTask);
streamTaskEndScanWAL(pTask);
} else { // do transfer task operator states.
code = streamDoTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo handle this
return code;
}
}
return code;
}
static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
const char* id) { const char* id) {
int32_t retryTimes = 0; int32_t retryTimes = 0;
...@@ -526,17 +547,16 @@ int32_t streamTaskEndScanWAL(SStreamTask* pTask) { ...@@ -526,17 +547,16 @@ int32_t streamTaskEndScanWAL(SStreamTask* pTask) {
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
// 3. notify downstream tasks to transfer executor state after handle all history blocks. // 1. notify all downstream tasks to transfer executor state after handle all history blocks.
pTask->status.transferState = true;
int32_t code = streamDispatchTransferStateMsg(pTask); int32_t code = streamDispatchTransferStateMsg(pTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
// todo handle error // todo handle error
} }
// the last execution of fill-history task, in order to transfer task operator states. // 2. do transfer stream task operator states.
code = streamTransferStateToStreamTask(pTask); pTask->status.transferState = true;
if (code != TSDB_CODE_SUCCESS) { // todo handle this code = streamDoTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo handle error
return code; return code;
} }
...@@ -560,9 +580,11 @@ int32_t streamTryExec(SStreamTask* pTask) { ...@@ -560,9 +580,11 @@ int32_t streamTryExec(SStreamTask* pTask) {
// todo the task should be commit here // todo the task should be commit here
if (taosQueueEmpty(pTask->inputQueue->queue)) { if (taosQueueEmpty(pTask->inputQueue->queue)) {
// fill-history WAL scan has completed // fill-history WAL scan has completed
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && pTask->status.transferState == true) { if (pTask->status.transferState) {
streamTaskFillHistoryFinished(pTask); code = streamTransferStateToStreamTask(pTask);
streamTaskEndScanWAL(pTask); if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else { } else {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册