From 13028744df296a441372e02b2909389628ca20f8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 15 Jun 2023 10:24:21 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tq.c | 7 +- source/libs/stream/src/streamExec.c | 116 ++++++++++++++++------------ 2 files changed, 69 insertions(+), 54 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e1cf46a5f4..6f05d67b5e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1177,12 +1177,13 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s set status to be dropping", pTask->id.idStr); streamMetaSaveTask(pMeta, pTask); - streamMetaReleaseTask(pMeta, pTask); -// streamMetaRemoveTask(pMeta, pTask->id.taskId); + streamMetaSaveTask(pMeta, pStreamTask); + streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pStreamTask); - if (streamMetaCommit(pTask->pMeta) < 0) { + if (streamMetaCommit(pTask->pMeta) < 0) { + // persist to disk } } else { // todo update the chkInfo version for current task. diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index b68d15328d..907360dff0 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -313,6 +313,69 @@ int32_t updateCheckPointInfo(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } +static void waitForTaskTobeIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { + // wait for the stream task to be idle + int64_t st = taosGetTimestampMs(); + + while (!streamTaskIsIdle(pStreamTask)) { + qDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", pTask->id.idStr, + pTask->info.taskLevel, pStreamTask->id.idStr); + taosMsleep(100); + } + + double el = (taosGetTimestampMs() - st) / 1000.0; + if (el > 0) { + qDebug("s-task:%s wait for stream task:%s for %.2fs to execute all data in inputQ", pTask->id.idStr, + pStreamTask->id.idStr, el); + } +} + +static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { + SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId); + qDebug("s-task:%s scan history task end, update stream task:%s info and launch it", pTask->id.idStr, pStreamTask->id.idStr); + + // todo handle stream task is dropped here + + ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId); + STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; + + // here we need to wait for the stream task handle all data in the input queue. + if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { + ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__HALT); + } else { + ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__NORMAL); + pStreamTask->status.taskStatus = TASK_STATUS__HALT; + } + + // wait for the stream task to be idle + waitForTaskTobeIdle(pTask, pStreamTask); + + if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { + // update the scan data range for source task. + qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " transfer to %" PRId64 " - %" PRId64 + ", status:%s, sched-status:%d", + pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN, + pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus); + + // todo transfer state + } else { + // for sink tasks, they are continue to execute, no need to be halt. + // the process should be stopped for a while, during the term of transfer task state. + // OR wait for the inputQ && outputQ of agg tasks are all consumed, and then start the state transfer + qDebug("s-task:%s no need to update time window, for non-source task", pStreamTask->id.idStr); + + // todo transfer state + } + + // expand the query time window for stream scanner + pTimeWindow->skey = INT64_MIN; + + streamSetStatusNormal(pStreamTask); + streamSchedExec(pStreamTask); + streamMetaReleaseTask(pTask->pMeta, pStreamTask); + return TSDB_CODE_SUCCESS; +} + /** * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * appropriate batch of blocks should be handled in 5 to 10 sec. @@ -388,57 +451,8 @@ int32_t streamExecForAll(SStreamTask* pTask) { } if (pInput == NULL) { - if (pTask->info.fillHistory && pTask->status.transferState) { - // todo transfer task state here - - SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId); - qDebug("s-task:%s scan history task end, update stream task:%s info and launch it", pTask->id.idStr, pStreamTask->id.idStr); - - ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId); - STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; - - // here we need to wait for the stream task handle all data in the input queue. - if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { - ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__HALT); - } else { - ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__NORMAL); - pStreamTask->status.taskStatus = TASK_STATUS__HALT; - } - - {// wait for the stream task to be idle - while(!streamTaskIsIdle(pStreamTask)) { - qDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", pTask->id.idStr, - pTask->info.taskLevel, pStreamTask->id.idStr); - taosMsleep(100); - } - } - - if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { - // update the scan data range for source task. - qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " transfer to %" PRId64 " - %" PRId64 - ", status:%s, sched-status:%d", - pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN, - pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus); - } else { - // for sink tasks, they are continue to execute, no need to be halt. - // the process should be stopped for a while, during the term of transfer task state. - // OR wait for the inputQ && outputQ of agg tasks are all consumed, and then start the state transfer - - - qDebug("s-task:%s no need to update time window, for non-source task", pStreamTask->id.idStr); - } - - // expand the query time window for stream scanner - pTimeWindow->skey = INT64_MIN; - - streamSetStatusNormal(pStreamTask); - streamMetaSaveTask(pTask->pMeta, pStreamTask); - if (streamMetaCommit(pTask->pMeta)) { - // persistent to disk - } - - streamSchedExec(pStreamTask); - streamMetaReleaseTask(pTask->pMeta, pStreamTask); + if (pTask->info.fillHistory && pTask->status.transferState) { + int32_t code = streamTransferStateToStreamTask(pTask); } break; -- GitLab