diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index 0a9037d21cff5e87e5f0da3b0948651e291ff781..29ba019e4782734297fc08fef48a8341aff54b6f 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -59,7 +59,7 @@ typedef struct SDataSinkMgtCfg { uint32_t maxDataBlockNumPerQuery; } SDataSinkMgtCfg; -int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI); +int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI, void** ppSinkManager); typedef struct SInputData { const struct SSDataBlock* pData; @@ -83,7 +83,7 @@ typedef struct SOutputData { * @param pHandle output * @return error code */ -int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id); +int32_t dsCreateDataSinker(void* pSinkManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id); int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat); diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index 11074b0e94b183612ec58004581ed696d5febf05..960ae14fcfdae97fcad9e70a4c7fd6eb9d9a4920 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -224,6 +224,8 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { } taosCloseQueue(pDeleter->pDataBlocks); taosThreadMutexDestroy(&pDeleter->mutex); + + taosMemoryFree(pDeleter->pManager); return TSDB_CODE_SUCCESS; } @@ -279,6 +281,8 @@ _end: if (deleter != NULL) { destroyDataSinker((SDataSinkHandle*)deleter); taosMemoryFree(deleter); + } else { + taosMemoryFree(pManager); } return code; } diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 2a22656d8ca70be98bffe008d1a455d25a0523a5..409ae50174faaa39326aff925f6b7d36a6272013 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -226,6 +226,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { } taosCloseQueue(pDispatcher->pDataBlocks); taosThreadMutexDestroy(&pDispatcher->mutex); + taosMemoryFree(pDispatcher->pManager); return TSDB_CODE_SUCCESS; } @@ -240,7 +241,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle)); if (NULL == dispatcher) { terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_OUT_OF_MEMORY; + goto _return; } dispatcher->sink.fPut = putDataBlock; dispatcher->sink.fEndPut = endPut; @@ -257,8 +258,13 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD if (NULL == dispatcher->pDataBlocks) { taosMemoryFree(dispatcher); terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_OUT_OF_MEMORY; + goto _return; } *pHandle = dispatcher; return TSDB_CODE_SUCCESS; + +_return: + + taosMemoryFree(pManager); + return terrno; } diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 646964ebf4eaf38b464bba680301b882ceb9d449..8a70726cc8cc41a83603b3836533e1c120f865fe 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -395,6 +395,8 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { taosMemoryFree(pInserter->pParam); taosHashCleanup(pInserter->pCols); taosThreadMutexDestroy(&pInserter->mutex); + + taosMemoryFree(pInserter->pManager); return TSDB_CODE_SUCCESS; } @@ -411,7 +413,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat if (NULL == inserter) { taosMemoryFree(pParam); terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_OUT_OF_MEMORY; + goto _return; } SQueryInserterNode* pInserterNode = (SQueryInserterNode*)pDataSink; @@ -431,23 +433,18 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat int64_t suid = 0; int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid); if (code) { - destroyDataSinker((SDataSinkHandle*)inserter); - taosMemoryFree(inserter); - return code; + terrno = code; + goto _return; } if (pInserterNode->stableId != suid) { - destroyDataSinker((SDataSinkHandle*)inserter); - taosMemoryFree(inserter); terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; - return terrno; + goto _return; } inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES); taosThreadMutexInit(&inserter->mutex, NULL); if (NULL == inserter->pDataBlocks) { - destroyDataSinker((SDataSinkHandle*)inserter); - taosMemoryFree(inserter); terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY; } @@ -471,4 +468,15 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat *pHandle = inserter; return TSDB_CODE_SUCCESS; + +_return: + + if (inserter) { + destroyDataSinker((SDataSinkHandle*)inserter); + taosMemoryFree(inserter); + } else { + taosMemoryFree(pManager); + } + + return terrno; } diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index 3a972c1c20406d8d61fde988e4ed93140c46c0fa..b2cbf4c1a2c5129f237442b2131d0ad366c4a452 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -18,12 +18,17 @@ #include "planner.h" #include "tarray.h" -static SDataSinkManager gDataSinkManager = {0}; SDataSinkStat gDataSinkStat = {0}; -int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI) { - gDataSinkManager.cfg = *cfg; - gDataSinkManager.pAPI = pAPI; +int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI, void** ppSinkManager) { + SDataSinkManager* pSinkManager = taosMemoryMalloc(sizeof(SDataSinkManager)); + if (NULL == pSinkManager) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pSinkManager->cfg = *cfg; + pSinkManager->pAPI = pAPI; + + *ppSinkManager = pSinkManager; return 0; // to avoid compiler eror } @@ -33,18 +38,22 @@ int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat) { return 0; } -int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) { +int32_t dsCreateDataSinker(void* pSinkManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) { + SDataSinkManager* pManager = pSinkManager; switch ((int)nodeType(pDataSink)) { case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: - return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle); + return createDataDispatcher(pManager, pDataSink, pHandle); case QUERY_NODE_PHYSICAL_PLAN_DELETE: { - return createDataDeleter(&gDataSinkManager, pDataSink, pHandle, pParam); + return createDataDeleter(pManager, pDataSink, pHandle, pParam); } case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: { - return createDataInserter(&gDataSinkManager, pDataSink, pHandle, pParam); + return createDataInserter(pManager, pDataSink, pHandle, pParam); } + default: + break; } + taosMemoryFree(pSinkManager); qError("invalid input node type:%d, %s", nodeType(pDataSink), id); return TSDB_CODE_QRY_INVALID_INPUT; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index a6059c7c4200551c654847cb00c3f738be9e36fc..28ee8f4b7ab297e71600d21202e7340af7c661f8 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -511,23 +511,25 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, goto _error; } - SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50}; - code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI); - if (code != TSDB_CODE_SUCCESS) { - qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str); - goto _error; - } - if (handle) { + SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50}; + void* pSinkManager = NULL; + code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager); + if (code != TSDB_CODE_SUCCESS) { + qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str); + goto _error; + } + void* pSinkParam = NULL; code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, (*pTask), readHandle); if (code != TSDB_CODE_SUCCESS) { qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str); + taosMemoryFree(pSinkManager); goto _error; } // pSinkParam has been freed during create sinker. - code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str); + code = dsCreateDataSinker(pSinkManager, pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str); } qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId);