diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index 816a53ad9626a8a49da940c62cd231aa6aa90da3..8a02f372d1d605b359482b5e27810e6f95488433 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -84,7 +84,7 @@ typedef struct SOutputData { * @param pHandle output * @return error code */ -int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam); +int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id); int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat); diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index a11c67c1fd7d652c77866a07758b8b48ca754fb0..513c54c7e9a5a7fbc513cfe55e397e48f6018b9e 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -893,16 +893,26 @@ void tTagFree(STag *pTag) { } char *tTagValToData(const STagVal *value, bool isJson) { - if (!value) return NULL; + if (!value) { + return NULL; + } + char *data = NULL; int8_t typeBytes = 0; if (isJson) { typeBytes = CHAR_BYTES; } + if (IS_VAR_DATA_TYPE(value->type)) { data = taosMemoryCalloc(1, typeBytes + VARSTR_HEADER_SIZE + value->nData); - if (data == NULL) return NULL; - if (isJson) *data = value->type; + if (data == NULL) { + return NULL; + } + + if (isJson) { + *data = value->type; + } + varDataLen(data + typeBytes) = value->nData; memcpy(varDataVal(data + typeBytes), value->pData, value->nData); } else { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 11cae00358392d6d519c8da3ec078708e168ea7d..88bbf67758227800d71f50fde6abd04d21ed67ab 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1523,9 +1523,9 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* return pReader->pMemSchema; } - taosMemoryFree(pReader->pMemSchema); + taosMemoryFreeClear(pReader->pMemSchema); int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS || pReader->pMemSchema == NULL) { terrno = code; return NULL; } else { @@ -2274,7 +2274,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { } _end: - pResBlock->info.uid = pBlockScanInfo->uid; + pResBlock->info.uid = (pBlockScanInfo != NULL)? pBlockScanInfo->uid:0; blockDataUpdateTsWindow(pResBlock, 0); setComposedBlockFlag(pReader, true); @@ -2569,7 +2569,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { } if (pScanInfo == NULL) { - tsdbError("failed to get table, uid:%" PRIu64 ", %s", pBlockInfo->uid, pReader->idStr); + tsdbError("failed to get table scan-info, %s", pReader->idStr); code = TSDB_CODE_INVALID_PARA; return code; } diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index 0714d0f3acd1703cdee50437ac82c92233627975..2ed83a6469645f685bd1be50b5896e14331e15bb 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -254,10 +254,12 @@ static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) { int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam) { + int32_t code = TSDB_CODE_SUCCESS; + SDataDeleterHandle* deleter = taosMemoryCalloc(1, sizeof(SDataDeleterHandle)); if (NULL == deleter) { - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return TSDB_CODE_QRY_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + goto _end; } SDataDeleterNode* pDeleterNode = (SDataDeleterNode*)pDataSink; @@ -270,17 +272,30 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData deleter->pManager = pManager; deleter->pDeleter = pDeleterNode; deleter->pSchema = pDataSink->pInputDataBlockDesc; + + if(pParam == NULL) { + code = TSDB_CODE_QRY_INVALID_INPUT; + qError("invalid input param in creating data deleter, code%s", tstrerror(code)); + goto _end; + } + deleter->pParam = pParam; deleter->status = DS_BUF_EMPTY; deleter->queryEnd = false; deleter->pDataBlocks = taosOpenQueue(); taosThreadMutexInit(&deleter->mutex, NULL); if (NULL == deleter->pDataBlocks) { - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + goto _end; + } + + *pHandle = deleter; + return code; + + _end: + if (deleter != NULL) { destroyDataSinker((SDataSinkHandle*)deleter); taosMemoryFree(deleter); - return TSDB_CODE_QRY_OUT_OF_MEMORY; } - *pHandle = deleter; - return TSDB_CODE_SUCCESS; + return code; } diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index ffdcf48d48c704eccbb430c3bddd843f704d80f7..b758e4b1dd9dd36ac0f668729b9aa74e9b095c4d 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -231,8 +231,10 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { while (!taosQueueEmpty(pDispatcher->pDataBlocks)) { SDataDispatchBuf* pBuf = NULL; taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); - taosMemoryFreeClear(pBuf->pData); - taosFreeQitem(pBuf); + if (pBuf != NULL) { + taosMemoryFreeClear(pBuf->pData); + taosFreeQitem(pBuf); + } } taosCloseQueue(pDispatcher->pDataBlocks); taosThreadMutexDestroy(&pDispatcher->mutex); diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index 206f3719fa0ad7a175d630c1caa346a8cd203c4c..2b50be33ad29d4cb11d764dd090d2126c1fb13a0 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -33,7 +33,7 @@ int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat) { return 0; } -int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam) { +int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) { switch ((int)nodeType(pDataSink)) { case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle); @@ -42,7 +42,9 @@ int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHand case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: return createDataInserter(&gDataSinkManager, pDataSink, pHandle, pParam); } - return TSDB_CODE_FAILED; + + qError("invalid input node type:%d, %s", nodeType(pDataSink), id); + return TSDB_CODE_QRY_INVALID_INPUT; } int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index cad3e3c44c8068de7f521403ce0e3d17e02feddc..fb4248e886fdb3a1428f5aadd9966690fa3f0282 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -370,7 +370,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, goto _error; } - code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam); + code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str); if (code != TSDB_CODE_SUCCESS) { taosMemoryFreeClear(pSinkParam); } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 1d99a32cf978539078d27caf6e19225bf0865939..c9711f13dd1015ae9c66f9fe93410253e6d2bde5 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3101,7 +3101,9 @@ _error: destroyAggOperatorInfo(pInfo); } + cleanupExprSupp(&pOperator->exprSupp); taosMemoryFreeClear(pOperator); + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; return NULL; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 53acf313308079a3f21571a9242008c03a12255c..60794fc22afbc23e662b0009bbc29eaed03ac73a 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -421,14 +421,14 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode goto _error; } - int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); + initResultSizeInfo(&pOperator->resultInfo, 4096); code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols); if (code != TSDB_CODE_SUCCESS) { goto _error; } - initResultSizeInfo(&pOperator->resultInfo, 4096); + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -453,7 +453,9 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode _error: pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - destroyGroupOperatorInfo(pInfo); + if (pInfo != NULL) { + destroyGroupOperatorInfo(pInfo); + } taosMemoryFreeClear(pOperator); return NULL; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index fa71e8efa60d1a98b202ef28b31e30ee737ca8a6..4e1b07e66295e7914f0a306a937d8a2b0e8fdf23 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -465,16 +465,14 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int colDataAppendNNULL(pColInfoData, 0, pBlock->info.rows); } else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) { colDataAppendNItems(pColInfoData, 0, data, pBlock->info.rows); + if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) { + taosMemoryFree(data); + } } else { // todo opt for json tag for (int32_t i = 0; i < pBlock->info.rows; ++i) { colDataAppend(pColInfoData, i, data, false); } } - - if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL && - IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) { - taosMemoryFree(data); - } } } diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 7d54d243ef067f1d339bfaea2a73a28a1f18d0dc..d1d22dc3e5df7ef3ef74cf0f2eb4ab22a2cc2ed5 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -263,29 +263,14 @@ static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bo static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SArray* pRow) { for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { int32_t type = pFillInfo->pFillCol[i].pExpr->pExpr->nodeType; - if (type == QUERY_NODE_COLUMN) { + if (type == QUERY_NODE_COLUMN || type == QUERY_NODE_OPERATOR || type == QUERY_NODE_FUNCTION) { int32_t srcSlotId = GET_DEST_SLOT_ID(&pFillInfo->pFillCol[i]); SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId); bool isNull = colDataIsNull_s(pSrcCol, rowIndex); char* p = colDataGetData(pSrcCol, rowIndex); - saveColData(pRow, i, p, isNull); - } else if (type == QUERY_NODE_OPERATOR) { - int32_t srcSlotId = GET_DEST_SLOT_ID(&pFillInfo->pFillCol[i]); - - SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId); - bool isNull = colDataIsNull_s(pSrcCol, rowIndex); - char* p = colDataGetData(pSrcCol, rowIndex); - saveColData(pRow, i, p, isNull); - } else if (type == QUERY_NODE_FUNCTION) { - int32_t srcSlotId = GET_DEST_SLOT_ID(&pFillInfo->pFillCol[i]); - - SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId); - - bool isNull = colDataIsNull_s(pSrcCol, rowIndex); - char* p = colDataGetData(pSrcCol, rowIndex); saveColData(pRow, i, p, isNull); } else { ASSERT(0);