diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 80fa7a72187f13b82a5979588b2bd0b9028ea715..849d83a58b09d99656d2b375af27678d77bac3d5 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -34,7 +34,7 @@ typedef struct { TXN txn; } SStreamState; -SStreamState* streamStateOpen(char* path, SStreamTask* pTask); +SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath); void streamStateClose(SStreamState* pState); int32_t streamStateBegin(SStreamState* pState); int32_t streamStateCommit(SStreamState* pState); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c8841e5e169c7cdf249894348ab949a225f58acc..db3076e1bdc9ba5923f8d539f6fe827548b805a9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -760,7 +760,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { // expand executor if (pTask->taskLevel == TASK_LEVEL__SOURCE) { - pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask); + pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false); if (pTask->pState == NULL) { return -1; } @@ -774,7 +774,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); ASSERT(pTask->exec.executor); } else if (pTask->taskLevel == TASK_LEVEL__AGG) { - pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask); + pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false); if (pTask->pState == NULL) { return -1; } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 6cd5132bb9d9021d5c0fe590224414430d36af40..5c7c5e7a6d1acf6e4bc9de6426ece7c37d505908 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -18,14 +18,19 @@ #include "tcommon.h" #include "ttimer.h" -SStreamState* streamStateOpen(char* path, SStreamTask* pTask) { +SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath) { SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); if (pState == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + char statePath[300]; - sprintf(statePath, "%s/%d", path, pTask->taskId); + if (!specPath) { + sprintf(statePath, "%s/%d", path, pTask->taskId); + } else { + memcpy(statePath, path, 300); + } if (tdbOpen(statePath, 4096, 256, &pState->db) < 0) { goto _err; } diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 93ced912f8e2358c2aab6f04957ce060cf61c924..95a861177ba0cfd4b0e32d8c88ac774004958d73 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -268,7 +268,7 @@ int walRollFileInfo(SWal* pWal) { char* walMetaSerialize(SWal* pWal) { char buf[30]; ASSERT(pWal->fileInfoSet); - int sz = pWal->fileInfoSet->size; + int sz = taosArrayGetSize(pWal->fileInfoSet); cJSON* pRoot = cJSON_CreateObject(); cJSON* pMeta = cJSON_CreateObject(); cJSON* pFiles = cJSON_CreateArray();