From 86c0fb56e0b29be59a09eed2e435e3c5ae1cd422 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 19 Sep 2022 17:27:25 +0800 Subject: [PATCH] enhance stream backend for sma --- include/libs/stream/streamState.h | 2 +- source/dnode/vnode/src/tq/tq.c | 4 ++-- source/libs/stream/src/streamState.c | 9 +++++++-- source/libs/wal/src/walMeta.c | 2 +- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 80fa7a7218..849d83a58b 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -34,7 +34,7 @@ typedef struct { TXN txn; } SStreamState; -SStreamState* streamStateOpen(char* path, SStreamTask* pTask); +SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath); void streamStateClose(SStreamState* pState); int32_t streamStateBegin(SStreamState* pState); int32_t streamStateCommit(SStreamState* pState); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c8841e5e16..db3076e1bd 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -760,7 +760,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { // expand executor if (pTask->taskLevel == TASK_LEVEL__SOURCE) { - pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask); + pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false); if (pTask->pState == NULL) { return -1; } @@ -774,7 +774,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); ASSERT(pTask->exec.executor); } else if (pTask->taskLevel == TASK_LEVEL__AGG) { - pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask); + pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false); if (pTask->pState == NULL) { return -1; } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 6cd5132bb9..5c7c5e7a6d 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -18,14 +18,19 @@ #include "tcommon.h" #include "ttimer.h" -SStreamState* streamStateOpen(char* path, SStreamTask* pTask) { +SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) { SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); if (pState == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + char statePath[300]; - sprintf(statePath, "%s/%d", path, pTask->taskId); + if (!specPath) { + sprintf(statePath, "%s/%d", path, pTask->taskId); + } else { + memcpy(statePath, path, 300); + } if (tdbOpen(statePath, 4096, 256, &pState->db) < 0) { goto _err; } diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 93ced912f8..95a861177b 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -268,7 +268,7 @@ int walRollFileInfo(SWal* pWal) { char* walMetaSerialize(SWal* pWal) { char buf[30]; ASSERT(pWal->fileInfoSet); - int sz = pWal->fileInfoSet->size; + int sz = taosArrayGetSize(pWal->fileInfoSet); cJSON* pRoot = cJSON_CreateObject(); cJSON* pMeta = cJSON_CreateObject(); cJSON* pFiles = cJSON_CreateArray(); -- GitLab