From 870d75bf82455dea3e7daef7f6754ffeb8b83462 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Tue, 20 Jun 2023 11:34:09 +0800 Subject: [PATCH] fill history pause&resume --- source/dnode/vnode/src/tq/tq.c | 24 +++++++++++------------- source/libs/stream/src/streamExec.c | 16 +++++++++++++++- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 36abd581ee..57d5cd8eb0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -779,9 +779,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { SStreamTask* pSateTask = pTask; - // if (pTask->info.fillHistory) { - // pSateTask = *(SStreamTask**)taosHashGet(pTq->pStreamMeta->pTasks, &pTask->streamTaskId.taskId, sizeof(int32_t)); - // } + if (pTask->info.fillHistory) { + pSateTask = *(SStreamTask**)taosHashGet(pTq->pStreamMeta->pTasks, &pTask->streamTaskId.taskId, sizeof(int32_t)); + } pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1); if (pTask->pState == NULL) { return -1; @@ -798,9 +798,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { SStreamTask* pSateTask = pTask; - // if (pTask->info.fillHistory) { - // pSateTask = *(SStreamTask**)taosHashGet(pTq->pStreamMeta->pTasks, &pTask->streamTaskId.taskId, sizeof(int32_t)); - // } + if (pTask->info.fillHistory) { + pSateTask = *(SStreamTask**)taosHashGet(pTq->pStreamMeta->pTasks, &pTask->streamTaskId.taskId, sizeof(int32_t)); + } pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1); if (pTask->pState == NULL) { return -1; @@ -1134,9 +1134,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { pTask->status.taskStatus = TASK_STATUS__DROPPING; tqDebug("s-task:%s set status to be dropping", pTask->id.idStr); // transfer the ownership of executor state - // todo(liuyao) - // streamTaskReleaseState(pTask); - // streamTaskReloadState(pStreamTask); + streamTaskReleaseState(pTask); + streamTaskReloadState(pStreamTask); streamMetaSaveTask(pMeta, pTask); streamMetaSaveTask(pMeta, pStreamTask); @@ -1183,10 +1182,9 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int return -1; } // transfer the ownership of executor state - // todo(liuyao) - // streamTaskReleaseState(pTask); - // SStreamTask* pStreamTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->streamTaskId.taskId); - // streamTaskReloadState(pStreamTask); + streamTaskReleaseState(pTask); + SStreamTask* pStreamTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->streamTaskId.taskId); + streamTaskReloadState(pStreamTask); ASSERT(pTask->streamTaskId.taskId != 0); pTask->status.transferState = true; // persistent data? diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 8793b11ab1..1af5948268 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -196,7 +196,21 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { } else { qSetStreamOpOpen(exec); if (streamTaskShouldPause(&pTask->status)) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); + if (qRes == NULL) { + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + qRes->type = STREAM_INPUT__DATA_BLOCK; + qRes->blocks = pRes; + code = streamTaskOutputResultBlock(pTask, qRes); + if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + taosFreeQitem(qRes); + return code; + } return 0; } } -- GitLab