From f790d91b45346fd486f28166668bf8b0b8d7898e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 31 Oct 2022 10:41:25 +0800 Subject: [PATCH] fix(query): check null before free resource, and some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 4 +- source/libs/executor/inc/executorimpl.h | 6 +- source/libs/executor/src/executil.c | 4 ++ source/libs/executor/src/executorimpl.c | 74 +++++++------------------ source/libs/executor/src/scanoperator.c | 46 +++++++++++++-- 5 files changed, 70 insertions(+), 64 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 04ffe3d900..5f90c567be 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1867,7 +1867,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == key) { init = true; TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); + code = tRowMergerInit(&merge, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1881,7 +1881,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* tRowMerge(&merge, &fRow1); } else { init = true; - int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema); + code = tRowMergerInit(&merge, &fRow1, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index cad4b562b8..1191b6a485 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -555,7 +555,7 @@ typedef struct SSysTableScanInfo { typedef struct SBlockDistInfo { SSDataBlock* pResBlock; - void* pHandle; + STsdbReader* pHandle; SReadHandle readHandle; uint64_t uid; // table uid } SBlockDistInfo; @@ -970,8 +970,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, - SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, + SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 82427f6eff..64fe529b4f 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1772,6 +1772,10 @@ _error: } void* tableListDestroy(STableListInfo* pTableListInfo) { + if (pTableListInfo == NULL) { + return NULL; + } + pTableListInfo->pTableList = taosArrayDestroy(pTableListInfo->pTableList); taosMemoryFreeClear(pTableListInfo->groupOffset); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 8a2d92af02..11dbfaff9f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3378,34 +3378,11 @@ bool groupbyTbname(SNodeList* pGroupList) { return bytbname; } -static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) { - memset(pCond, 0, sizeof(SQueryTableDataCond)); - - pCond->order = TSDB_ORDER_ASC; - pCond->numOfCols = 1; - pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo)); - if (pCond->colList == NULL) { - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return terrno; - } - - pCond->colList->colId = 1; - pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP; - pCond->colList->bytes = sizeof(TSKEY); - - pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; - pCond->suid = uid; - pCond->type = TIMEWINDOW_RANGE_CONTAINED; - pCond->startVersion = -1; - pCond->endVersion = -1; - - return TSDB_CODE_SUCCESS; -} - -SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, - STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, - const char* pUser) { +SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond, + SNode* pTagIndexCond, const char* pUser) { int32_t type = nodeType(pPhyNode); + STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList; + const char* idstr = GET_TASKID(pTaskInfo); if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { SOperatorInfo* pOperator = NULL; @@ -3420,10 +3397,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle, - pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); + pTableListInfo, pTagCond, pTagIndexCond, idstr); if (code) { pTaskInfo->code = code; - qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), GET_TASKID(pTaskInfo)); + qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr); return NULL; } @@ -3440,7 +3417,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, /*pTableScanNode->groupSort*/true, pHandle, - pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); + pTableListInfo, pTagCond, pTagIndexCond, idstr); if (code) { pTaskInfo->code = code; qError("failed to createScanTableListInfo, code: %s", tstrerror(code)); @@ -3465,7 +3442,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo if (pHandle->vnode) { int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, - pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); + pHandle, pTableListInfo, pTagCond, pTagIndexCond, idstr); if (code) { pTaskInfo->code = code; qError("failed to createScanTableListInfo, code: %s", tstrerror(code)); @@ -3492,7 +3469,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode; int32_t code = createScanTableListInfo(pScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond, - pTagIndexCond, GET_TASKID(pTaskInfo)); + pTagIndexCond, idstr); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; qError("failed to getTableList, code: %s", tstrerror(code)); @@ -3516,30 +3493,16 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo tableListAddTableInfo(pTableListInfo, p->uid, 0); } taosArrayDestroy(pList); - } else { // Create one table group. + } else { // Create group with only one table tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0); } - SQueryTableDataCond cond = {0}; - - int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond); - if (code != TSDB_CODE_SUCCESS) { - return NULL; - } - - size_t num = tableListGetSize(pTableListInfo); - void* pList = tableListGetInfo(pTableListInfo, 0); - - STsdbReader* pReader = NULL; - tsdbReaderOpen(pHandle->vnode, &cond, pList, num, &pReader, ""); - cleanupQueryTableDataCond(&cond); - - pOperator = createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo); + pOperator = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) { SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode; int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, - pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); + pTagCond, pTagIndexCond, idstr); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; return NULL; @@ -3566,17 +3529,18 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } size_t size = LIST_LENGTH(pPhyNode->pChildren); - SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES); + if (ops == NULL) { + return NULL; + } + for (int32_t i = 0; i < size; ++i) { SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); - ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pUser); + ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTagCond, pTagIndexCond, pUser); if (ops[i] == NULL) { taosMemoryFree(ops); return NULL; } - - ops[i]->resultDataBlockId = pChildNode->pOutputDataBlockDesc->dataBlockId; } SOperatorInfo* pOptr = NULL; @@ -3888,8 +3852,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead sql = NULL; (*pTaskInfo)->pSubplan = pPlan; - (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, (*pTaskInfo)->pTableInfoList, - pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user); + (*pTaskInfo)->pRoot = + createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user); if (NULL == (*pTaskInfo)->pRoot) { code = (*pTaskInfo)->code; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e254083d88..b9324d0c64 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1000,8 +1000,31 @@ static void destroyBlockDistScanOperatorInfo(void* param) { taosMemoryFreeClear(param); } -SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, - SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo) { +static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) { + memset(pCond, 0, sizeof(SQueryTableDataCond)); + + pCond->order = TSDB_ORDER_ASC; + pCond->numOfCols = 1; + pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo)); + if (pCond->colList == NULL) { + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return terrno; + } + + pCond->colList->colId = 1; + pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP; + pCond->colList->bytes = sizeof(TSKEY); + + pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; + pCond->suid = uid; + pCond->type = TIMEWINDOW_RANGE_CONTAINED; + pCond->startVersion = -1; + pCond->endVersion = -1; + + return TSDB_CODE_SUCCESS; +} + +SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo) { SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -1009,9 +1032,24 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re goto _error; } - pInfo->pHandle = dataReader; + { + SQueryTableDataCond cond = {0}; + + int32_t code = initTableblockDistQueryCond(pBlockScanNode->suid, &cond); + if (code != TSDB_CODE_SUCCESS) { + return NULL; + } + + STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList; + size_t num = tableListGetSize(pTableListInfo); + void* pList = tableListGetInfo(pTableListInfo, 0); + + tsdbReaderOpen(readHandle->vnode, &cond, pList, num, &pInfo->pHandle, pTaskInfo->id.str); + cleanupQueryTableDataCond(&cond); + } + pInfo->readHandle = *readHandle; - pInfo->uid = uid; + pInfo->uid = pBlockScanNode->suid; pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc); int32_t numOfCols = 0; -- GitLab