From c296dd0a6394aa7e7801d073cdb58bc24ae039f4 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 23 Aug 2022 20:33:26 +0800 Subject: [PATCH] fix compile --- include/libs/executor/executor.h | 25 ++++++++++++------------- source/libs/executor/CMakeLists.txt | 3 +-- source/libs/executor/src/executorimpl.c | 4 ++-- source/libs/stream/src/streamExec.c | 1 - source/libs/stream/src/streamMeta.c | 9 +++++---- 5 files changed, 20 insertions(+), 22 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index fb572b5591..1ce88905c2 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -23,7 +23,6 @@ extern "C" { #include "query.h" #include "tcommon.h" #include "tmsgcb.h" -#include "tstream.h" typedef void* qTaskInfo_t; typedef void* DataSinkHandle; @@ -31,18 +30,18 @@ struct SRpcMsg; struct SSubplan; typedef struct { - void* tqReader; - void* meta; - void* config; - void* vnode; - void* mnd; - SMsgCb* pMsgCb; - int64_t version; - bool initMetaReader; - bool initTableReader; - bool initTqReader; - int32_t numOfVgroups; - SStreamState* pState; + void* tqReader; + void* meta; + void* config; + void* vnode; + void* mnd; + SMsgCb* pMsgCb; + int64_t version; + bool initMetaReader; + bool initTableReader; + bool initTqReader; + int32_t numOfVgroups; + void* pStateBackend; } SReadHandle; // in queue mode, data streams are seperated by msg diff --git a/source/libs/executor/CMakeLists.txt b/source/libs/executor/CMakeLists.txt index e0e2a7a267..89d08b3078 100644 --- a/source/libs/executor/CMakeLists.txt +++ b/source/libs/executor/CMakeLists.txt @@ -8,8 +8,7 @@ add_library(executor STATIC ${EXECUTOR_SRC}) # ) target_link_libraries(executor - PUBLIC stream - PRIVATE os util common function parser planner qcom vnode scalar nodes index + PRIVATE os util common function parser planner qcom vnode scalar nodes index stream ) target_include_directories( diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f7577aee9c..dc2389e25f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4616,8 +4616,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead goto _complete; } - if (pHandle && pHandle->pState) { - (*pTaskInfo)->streamInfo.pState = pHandle->pState; + if (pHandle && pHandle->pStateBackend) { + (*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend; } (*pTaskInfo)->sql = sql; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 06ca26f029..102bad7426 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -140,7 +140,6 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) return 0; } -// TODO: handle version int32_t streamExecForAll(SStreamTask* pTask) { while (1) { int32_t batchCnt = 1; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 2386dbb3c4..0041d800a7 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -23,16 +23,17 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - char streamPath[200]; + int32_t len = strlen(path) + 20; + char* streamPath = taosMemoryCalloc(1, len); sprintf(streamPath, "%s/%s", path, "stream"); pMeta->path = strdup(streamPath); if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) { goto _err; } - char checkpointPath[200]; - sprintf(checkpointPath, "%s/%s", streamPath, "checkpoints"); - mkdir(checkpointPath, 0755); + sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints"); + mkdir(streamPath, 0755); + taosMemoryFree(streamPath); if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) { goto _err; -- GitLab