From 05d416f3b960a5dd293a42a6151a261217ce2b1a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 28 Apr 2023 23:53:53 +0800 Subject: [PATCH] fix(stream): fix invalid access when handling error, not start stream if tsDisablestream is set. --- source/dnode/vnode/src/vnd/vnodeSync.c | 7 ++++++- source/libs/executor/src/executor.c | 1 - source/libs/executor/src/querytask.c | 4 ++++ 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index dc2d709d76..1f958c569f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -555,7 +555,12 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) vInfo("vgId:%d, sync restore finished, start to restore stream tasks by replay wal", pVnode->config.vgId); // start to restore all stream tasks - tqStartStreamTasks(pVnode->pTq); + if (tsDisableStream) { + vInfo("vgId:%d, not restore stream tasks, since disabled", pVnode->config.vgId); + } else { + vInfo("vgId:%d start to restore stream tasks", pVnode->config.vgId); + tqStartStreamTasks(pVnode->pTq); + } } static void vnodeBecomeFollower(const SSyncFSM *pFsm) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 655952e378..2d991a14f5 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -312,7 +312,6 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v qTaskInfo_t pTaskInfo = NULL; code = qCreateExecTask(readers, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM); if (code != TSDB_CODE_SUCCESS) { - nodesDestroyNode((SNode*)pPlan); qDestroyTask(pTaskInfo); terrno = code; return NULL; diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index b6b250a325..a4d8327b6a 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -99,6 +99,7 @@ int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHand if (NULL == (*pTaskInfo)->pRoot) { int32_t code = (*pTaskInfo)->code; doDestroyTask(*pTaskInfo); + (*pTaskInfo) = NULL; return code; } else { return TSDB_CODE_SUCCESS; @@ -206,11 +207,14 @@ static void freeBlock(void* pParam) { void doDestroyTask(SExecTaskInfo* pTaskInfo) { qDebug("%s execTask is freed", GET_TASKID(pTaskInfo)); destroyOperator(pTaskInfo->pRoot); + pTaskInfo->pRoot = NULL; + cleanupQueriedTableScanInfo(&pTaskInfo->schemaInfo); cleanupStreamInfo(&pTaskInfo->streamInfo); if (!pTaskInfo->localFetch.localExec) { nodesDestroyNode((SNode*)pTaskInfo->pSubplan); + pTaskInfo->pSubplan = NULL; } taosArrayDestroyEx(pTaskInfo->pResultBlockList, freeBlock); -- GitLab