From fe4e45647ea854cfab9cbc7aabc0054ced5117c0 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 22 Aug 2023 18:29:25 +0800 Subject: [PATCH] fix: global data sink manager issue --- source/libs/executor/src/dataDeleter.c | 2 ++ source/libs/executor/src/dataDispatcher.c | 1 + source/libs/executor/src/dataInserter.c | 2 ++ source/libs/executor/src/dataSinkMgt.c | 22 ++++++++++++++-------- source/libs/executor/src/executor.c | 5 +++-- 5 files changed, 22 insertions(+), 10 deletions(-) diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index 11074b0e94..a30dc47029 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -224,6 +224,8 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { } taosCloseQueue(pDeleter->pDataBlocks); taosThreadMutexDestroy(&pDeleter->mutex); + + taosMemoryFree(pDeleter->pManager); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 2a22656d8c..56d0ca523a 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -226,6 +226,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { } taosCloseQueue(pDispatcher->pDataBlocks); taosThreadMutexDestroy(&pDispatcher->mutex); + taosMemoryFree(pDispatcher->pManager); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 646964ebf4..f20293791b 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -395,6 +395,8 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { taosMemoryFree(pInserter->pParam); taosHashCleanup(pInserter->pCols); taosThreadMutexDestroy(&pInserter->mutex); + + taosMemoryFree(pInserter->pManager); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index 3a972c1c20..f07d176440 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -18,12 +18,17 @@ #include "planner.h" #include "tarray.h" -static SDataSinkManager gDataSinkManager = {0}; SDataSinkStat gDataSinkStat = {0}; -int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI) { - gDataSinkManager.cfg = *cfg; - gDataSinkManager.pAPI = pAPI; +int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI, void** ppSinkManager) { + SDataSinkManager* pSinkManager = taosMemoryMalloc(sizeof(SDataSinkManager)); + if (NULL == pSinkManager) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pSinkManager->cfg = *cfg; + pSinkManager->pAPI = pAPI; + + *ppSinkManager = pSinkManager; return 0; // to avoid compiler eror } @@ -33,15 +38,16 @@ int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat) { return 0; } -int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) { +int32_t dsCreateDataSinker(void* pSinkManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) { + SDataSinkManager* pManager = pSinkManager; switch ((int)nodeType(pDataSink)) { case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: - return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle); + return createDataDispatcher(pManager, pDataSink, pHandle); case QUERY_NODE_PHYSICAL_PLAN_DELETE: { - return createDataDeleter(&gDataSinkManager, pDataSink, pHandle, pParam); + return createDataDeleter(pManager, pDataSink, pHandle, pParam); } case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: { - return createDataInserter(&gDataSinkManager, pDataSink, pHandle, pParam); + return createDataInserter(pManager, pDataSink, pHandle, pParam); } } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index a6059c7c42..9f5db5d6ae 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -512,7 +512,8 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, } SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50}; - code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI); + void* pSinkManager = NULL; + code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager); if (code != TSDB_CODE_SUCCESS) { qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str); goto _error; @@ -527,7 +528,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, } // pSinkParam has been freed during create sinker. - code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str); + code = dsCreateDataSinker(pSinkManager, pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str); } qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId); -- GitLab