diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 6204b9dcfca0be850d036f0eeb1687f7ba6148bb..2a0d4e7ff69e79b10f7c0f0d47aa818f918c8a38 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -71,8 +71,8 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet); #define colDataGetData(p1_, r_) \ ((IS_VAR_DATA_TYPE((p1_)->info.type)) ? colDataGetVarData(p1_, r_) : colDataGetNumData(p1_, r_)) -#define IS_JSON_NULL(type, data) ((type) == TSDB_DATA_TYPE_JSON && \ - (*(data) == TSDB_DATA_TYPE_NULL || tTagIsJsonNull(data))) +#define IS_JSON_NULL(type, data) \ + ((type) == TSDB_DATA_TYPE_JSON && (*(data) == TSDB_DATA_TYPE_NULL || tTagIsJsonNull(data))) static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) { if (!pColumnInfoData->hasNull) { @@ -186,7 +186,8 @@ int32_t getJsonValueLen(const char* data); int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull); int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, uint32_t* capacity, const SColumnInfoData* pSource, uint32_t numOfRow2); -int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, const SDataBlockInfo* pBlockInfo); +int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, + const SDataBlockInfo* pBlockInfo); int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex); int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows); @@ -222,6 +223,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize); int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n); +int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src); SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); SSDataBlock* createDataBlock(); int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 7673e730356fcb75cfa38f41521b5c41d596d6e6..a47810e7c1a918ef4ce9795fdbd08bf45ce363ce 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -344,10 +344,12 @@ static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBloc if (pTask->sinkType == TASK_SINK__TABLE) { ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); + taosArrayDestroyEx(pBlock->blocks, (FDelete)tDeleteSSDataBlock); taosFreeQitem(pBlock); } else if (pTask->sinkType == TASK_SINK__SMA) { ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks); + taosArrayDestroyEx(pBlock->blocks, (FDelete)tDeleteSSDataBlock); taosFreeQitem(pBlock); } else { ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 8317c6c79daedd59a99652b638ce40e64f4ebdaf..593f8c5c0bb55975d29957c2ea80349c152ac4fd 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -263,7 +263,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, ui pColumnInfoData->varmeta.length = len + oldLen; } else { if (finalNumOfRows > *capacity) { - ASSERT(finalNumOfRows*pColumnInfoData->info.bytes); + ASSERT(finalNumOfRows * pColumnInfoData->info.bytes); char* tmp = taosMemoryRealloc(pColumnInfoData->pData, finalNumOfRows * pColumnInfoData->info.bytes); if (tmp == NULL) { return TSDB_CODE_VND_OUT_OF_MEMORY; @@ -293,7 +293,8 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, ui return numOfRow1 + numOfRow2; } -int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, const SDataBlockInfo* pBlockInfo) { +int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows, + const SDataBlockInfo* pBlockInfo) { ASSERT(pColumnInfoData != NULL && pSource != NULL && pColumnInfoData->info.type == pSource->info.type); if (numOfRows <= 0) { return numOfRows; @@ -327,9 +328,7 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p return 0; } -size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { - return taosArrayGetSize(pBlock->pDataBlock); -} +size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { return taosArrayGetSize(pBlock->pDataBlock); } size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; } @@ -396,7 +395,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd int32_t pageSize) { ASSERT(pBlock != NULL && stopIndex != NULL); - size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); int32_t numOfRows = pBlock->info.rows; int32_t bitmapChar = 1; @@ -512,7 +511,7 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { // write the number of rows *(uint32_t*)buf = pBlock->info.rows; - size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); int32_t numOfRows = pBlock->info.rows; char* pStart = buf + sizeof(uint32_t); @@ -542,7 +541,7 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { pBlock->info.rows = *(int32_t*)buf; - size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); const char* pStart = buf + sizeof(uint32_t); for (int32_t i = 0; i < numOfCols; ++i) { @@ -734,7 +733,7 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) { static int32_t doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, const SSDataBlock* pSrcBlock, int32_t tupleIndex) { int32_t code = 0; - size_t numOfCols = taosArrayGetSize(pSrcBlock->pDataBlock); + size_t numOfCols = taosArrayGetSize(pSrcBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pDst = &pDstCols[i]; @@ -794,7 +793,7 @@ static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataB static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) { int32_t rows = pDataBlock->info.rows; - size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); + size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); SColumnInfoData* pCols = taosMemoryCalloc(numOfCols, sizeof(SColumnInfoData)); if (pCols == NULL) { @@ -902,7 +901,6 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { } else { // var data type } } else if (numOfCols == 2) { - } } @@ -1103,14 +1101,14 @@ void blockDataCleanup(SSDataBlock* pDataBlock) { } } -static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo *pBlockInfo, uint32_t numOfRows) { +static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows) { ASSERT(numOfRows > 0 && pBlockInfo->capacity >= pBlockInfo->rows); if (numOfRows < pBlockInfo->capacity) { return TSDB_CODE_SUCCESS; } // todo temp disable it -// ASSERT(pColumn->info.bytes != 0); + // ASSERT(pColumn->info.bytes != 0); int32_t existedRows = pBlockInfo->rows; @@ -1141,7 +1139,7 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo * if (tmp == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - + memset(tmp + pColumn->info.bytes * existedRows, 0, pColumn->info.bytes * (numOfRows - existedRows)); pColumn->pData = tmp; } @@ -1197,6 +1195,40 @@ void* blockDataDestroy(SSDataBlock* pBlock) { taosMemoryFreeClear(pBlock); return NULL; } +int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) { + ASSERT(src != NULL); + + dst->info = src->info; + dst->info.rows = 0; + dst->info.capacity = 0; + + size_t numOfCols = taosArrayGetSize(src->pDataBlock); + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* p = taosArrayGet(src->pDataBlock, i); + SColumnInfoData colInfo = {.hasNull = true, .info = p->info}; + blockDataAppendColInfo(dst, &colInfo); + } + + int32_t code = blockDataEnsureCapacity(dst, src->info.rows); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return -1; + } + + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pDst = taosArrayGet(dst->pDataBlock, i); + SColumnInfoData* pSrc = taosArrayGet(src->pDataBlock, i); + if (pSrc->pData == NULL) { + continue; + } + + colDataAssign(pDst, pSrc, src->info.rows, &src->info); + } + + dst->info.rows = src->info.rows; + dst->info.capacity = src->info.rows; + return 0; +} SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { if (pDataBlock == NULL) { @@ -1272,7 +1304,7 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoDat } // todo disable it temporarily -// ASSERT(pColInfoData->info.type != 0); + // ASSERT(pColInfoData->info.type != 0); if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { pBlock->info.hasVarCol = true; } @@ -1284,7 +1316,7 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoDat SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId) { SColumnInfoData col = {.hasNull = true}; col.info.colId = colId; - col.info.type = type; + col.info.type = type; col.info.bytes = bytes; return col; @@ -1552,9 +1584,9 @@ void blockDebugShowData(const SArray* dataBlocks, const char* flag) { int32_t sz = taosArrayGetSize(dataBlocks); for (int32_t i = 0; i < sz; i++) { SSDataBlock* pDataBlock = taosArrayGet(dataBlocks, i); - size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); + size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); - int32_t rows = pDataBlock->info.rows; + int32_t rows = pDataBlock->info.rows; printf("%s |block type %d |child id %d|\n", flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId); for (int32_t j = 0; j < rows; j++) { printf("%s |", flag); @@ -1633,8 +1665,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks SSDataBlock* pDataBlock = taosArrayGet(pDataBlocks, i); int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; -// int32_t rowSize = pDataBlock->info.rowSize; -// int64_t groupId = pDataBlock->info.groupId; + // int32_t rowSize = pDataBlock->info.rowSize; + // int64_t groupId = pDataBlock->info.groupId; if (colNum <= 1) { // invalid if only with TS col diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index da052ee3eb7723ab3d23ffc348d09384801ed578..be8fef1249cae441fb267837e7729f3e7b87c7d7 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -66,6 +66,7 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataBlkRsp* pRsp if (qStreamScanSnapshot(task) < 0) { ASSERT(0); } + // set version while (1) { SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index e5475d7c300531b7b3e0d8cce369fa9eb0415930..e536e87032af55b529477f6b2051db07677b6902 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -44,6 +44,7 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { } char* fname = buildFileName(pStore->pTq->path); TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ); + taosMemoryFree(fname); if (pFile != NULL) { STqOffsetHead head = {0}; int64_t code; @@ -77,7 +78,6 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { } taosCloseFile(&pFile); - taosMemoryFree(fname); } return pStore; } @@ -102,6 +102,7 @@ int32_t tqOffsetSnapshot(STqOffsetStore* pStore) { // TODO file name should be with a version char* fname = buildFileName(pStore->pTq->path); TdFilePtr pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); + taosMemoryFree(fname); if (pFile == NULL) { ASSERT(0); return -1; @@ -140,6 +141,5 @@ int32_t tqOffsetSnapshot(STqOffsetStore* pStore) { } // close and rename file taosCloseFile(&pFile); - taosMemoryFree(fname); return 0; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 5e5a4883a9d91f63601dca9db9da7252563a3164..5d2fb81cf0669e07f703fc0006416353ada7bc2d 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -151,6 +151,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_ int32_t sversion = htonl(pHandle->pBlock->sversion); if (pHandle->cachedSchemaSuid == 0 || pHandle->cachedSchemaVer != sversion || pHandle->cachedSchemaSuid != pHandle->msgIter.suid) { + if (pHandle->pSchema) taosMemoryFree(pHandle->pSchema); pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion); if (pHandle->pSchema == NULL) { tqWarn("cannot found tsschema for table: uid: %ld (suid: %ld), version %d, possibly dropped table", @@ -161,6 +162,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_ } // this interface use suid instead of uid + if (pHandle->pSchemaWrapper) tDeleteSSchemaWrapper(pHandle->pSchemaWrapper); pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion, true); if (pHandle->pSchemaWrapper == NULL) { tqWarn("cannot found schema wrapper for table: suid: %ld, version %d, possibly dropped table", @@ -184,7 +186,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_ while (colMeta < pSchemaWrapper->nCols) { SSchema* pColSchema = &pSchemaWrapper->pSchema[colMeta]; SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId); - int32_t code = blockDataAppendColInfo(pBlock, &colInfo); + int32_t code = blockDataAppendColInfo(pBlock, &colInfo); if (code != TSDB_CODE_SUCCESS) { goto FAIL; } @@ -207,7 +209,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_ colNeed++; } else { SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId); - int32_t code = blockDataAppendColInfo(pBlock, &colInfo); + int32_t code = blockDataAppendColInfo(pBlock, &colInfo); if (code != TSDB_CODE_SUCCESS) { goto FAIL; } @@ -251,8 +253,8 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReadHandle* pHandle, uint64_ } return 0; -FAIL: // todo refactor here -// if (*ppCols) taosArrayDestroy(*ppCols); +FAIL: // todo refactor here + // if (*ppCols) taosArrayDestroy(*ppCols); return -1; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 91931c2fd81e4895eac2c9f134eb44efb648edb6..57a2c57b163952627f0b5dd818f489d367da8363 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -336,7 +336,6 @@ typedef struct SStreamBlockScanInfo { int32_t numOfPseudoExpr; int32_t primaryTsIndex; // primary time stamp slot id - void* pDataReader; SReadHandle readHandle; uint64_t tableUid; // queried super table uid EStreamScanMode scanMode; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 962b7b9c518991d92ed335acdf7a836a65708f18..afe0a1f8fa9391857df89ebae4ad74fbc635a76f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1341,7 +1341,7 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR } if (rowRes != NULL) { - int32_t totalRows = pBlock->info.rows; + int32_t totalRows = pBlock->info.rows; SSDataBlock* px = createOneDataBlock(pBlock, true); size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); @@ -3954,7 +3954,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, } } } - int32_t len = (int32_t)(pStart - (char*)keyBuf); + int32_t len = (int32_t)(pStart - (char*)keyBuf); uint64_t* pGroupId = taosHashGet(pTableListInfo->map, keyBuf, len); @@ -3980,8 +3980,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; - tsdbReaderT pDataReader = - doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId); + tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId); if (pDataReader == NULL && terrno != 0) { pTaskInfo->code = terrno; return NULL; @@ -4007,16 +4006,20 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; return pOperator; + } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; createScanTableListInfo(pTableScanNode, pHandle, pTableListInfo, queryId, taskId); extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); - SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo, queryId, taskId); + SOperatorInfo* pOperator = + createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo, queryId, taskId); STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; return pOperator; + } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; @@ -4031,20 +4034,22 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo if (pHandle->initTsdbReader) { // for stream ASSERT(pHandle->vnode); - pDataReader = - doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId); + pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId); } else { // for tq ASSERT(pHandle->meta); getTableList(pHandle->meta, pScanPhyNode, pTableListInfo); } } + +#if 0 if (pDataReader == NULL && terrno != 0) { qDebug("%s pDataReader is NULL", GET_TASKID(pTaskInfo)); // return NULL; } else { qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo)); } +#endif SArray* groupKeys = extractPartitionColInfo(pTableScanNode->pPartitionTags); int32_t code = generateGroupIdMap(pTableListInfo, pHandle, groupKeys); // todo for json @@ -4057,9 +4062,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, pTableScanNode, pTaskInfo, &twSup); return pOperator; + } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode; return createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode; @@ -4070,6 +4077,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); @@ -4343,7 +4351,7 @@ tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* goto _error; } - tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId); + tsdbReaderT pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId); cleanupQueryTableDataCond(&cond); return pReader; @@ -4487,18 +4495,14 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead (*pTaskInfo)->sql = sql; (*pTaskInfo)->tableqinfoList.pTagCond = pPlan->pTagCond; (*pTaskInfo)->tableqinfoList.pTagIndexCond = pPlan->pTagIndexCond; - (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, - &(*pTaskInfo)->tableqinfoList); + (*pTaskInfo)->pRoot = + createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoList); + if (NULL == (*pTaskInfo)->pRoot) { code = (*pTaskInfo)->code; goto _complete; } - if ((*pTaskInfo)->pRoot == NULL) { - code = TSDB_CODE_QRY_OUT_OF_MEMORY; - goto _complete; - } - return code; _complete: diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b521983d8e83a88962d871ab05372c4ee8143d38..07212bc0186f1ec09f379a12cf098727d60af37e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -967,9 +967,9 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { while (tqNextDataBlock(pInfo->streamBlockReader)) { SSDataBlock block = {0}; - uint64_t groupId = 0; - uint64_t uid = 0; - int32_t numOfRows = 0; + uint64_t groupId = 0; + uint64_t uid = 0; + int32_t numOfRows = 0; // todo refactor int32_t code = tqRetrieveDataBlock(&block, pInfo->streamBlockReader, &groupId, &uid, &numOfRows); @@ -1022,6 +1022,9 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { } } + // TODO refactor @liao + taosArrayDestroy(block.pDataBlock); + if (pInfo->pRes->pDataBlock == NULL) { // TODO add log pOperator->status = OP_EXEC_DONE; @@ -1061,12 +1064,11 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { } return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; + } else if (pInfo->blockType == STREAM_DATA_TYPE_FROM_SNAPSHOT) { SSDataBlock* pResult = doTableScan(pInfo->pSnapshotReadOp); - if (pResult) { - return pResult->info.rows > 0 ? pResult : NULL; - } - return NULL; + return pResult && pResult->info.rows > 0 ? pResult : NULL; + } else { ASSERT(0); return NULL; @@ -1161,7 +1163,6 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan pInfo->pRes = createResDataBlock(pDescNode); pInfo->pUpdateRes = createResDataBlock(pDescNode); pInfo->pCondition = pScanPhyNode->node.pConditions; - pInfo->pDataReader = pDataReader; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = NULL, .gap = -1}; pInfo->groupId = 0; @@ -1343,7 +1344,8 @@ static SSDataBlock* buildSysTableMetaBlock() { SSDataBlock* pBlock = createDataBlock(); for (int32_t i = 0; i < pMeta[index].colNum; ++i) { - SColumnInfoData colInfoData = createColumnInfoData(pMeta[index].schema[i].type, pMeta[index].schema[i].bytes, i + 1); + SColumnInfoData colInfoData = + createColumnInfoData(pMeta[index].schema[i].type, pMeta[index].schema[i].bytes, i + 1); blockDataAppendColInfo(pBlock, &colInfoData); } diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 9654e21d24a3768a69d8452715fd56fac73b2f2d..2f41c083549d0cc6f060fe888df5fd697099aa95 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -34,7 +34,7 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb); int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb); int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); -int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet); +int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet); int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index b5a9e221a74195e4799efab35ae3d2e52bf09efa..11cd089606a2ff33d256ed6916524ef6f4dd0541 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -172,7 +172,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis pRetrieve->streamBlockType = pBlock->info.type; pRetrieve->numOfRows = htonl(pBlock->info.rows); - int32_t numOfCols = (int32_t) taosArrayGetSize(pBlock->pDataBlock); + int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); pRetrieve->numOfCols = htonl(numOfCols); int32_t actualLen = 0; @@ -185,7 +185,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis return 0; } -int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet) { +int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet) { void* buf = NULL; int32_t code = -1; int32_t blockNum = taosArrayGetSize(data->blocks); @@ -307,6 +307,8 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) { atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); return -1; } + taosArrayDestroyEx(pBlock->blocks, (FDelete)tDeleteSSDataBlock); + taosFreeQitem(pBlock); tmsgSendReq(pEpSet, &dispatchMsg); return 0; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 6fada59c84e38392db20e6862bb847468de46eb2..cb7dd241c1912d0f5e2dca88be9562e3fca095e3 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -53,9 +53,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) } // TODO: do we need free memory? - SSDataBlock* outputCopy = createOneDataBlock(output, true); - outputCopy->info.childId = pTask->selfChildId; - taosArrayPush(pRes, outputCopy); + SSDataBlock block = {0}; + assignOneDataBlock(&block, output); + block.info.childId = pTask->selfChildId; + taosArrayPush(pRes, &block); + /*SSDataBlock* outputCopy = createOneDataBlock(output, true);*/ + /*outputCopy->info.childId = pTask->selfChildId;*/ + /*taosArrayPush(pRes, outputCopy);*/ } return 0; } @@ -68,6 +72,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { streamTaskExecImpl(pTask, data, pRes); if (pTask->taskStatus == TASK_STATUS__DROPPING) { + taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock); return NULL; } @@ -82,25 +87,25 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { qRes->blocks = pRes; if (streamTaskOutput(pTask, qRes) < 0) { streamQueueProcessFail(pTask->inputQueue); - taosArrayDestroy(pRes); + taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock); taosFreeQitem(qRes); return NULL; } - - int8_t type = ((SStreamQueueItem*)data)->type; - if (type == STREAM_INPUT__TRIGGER) { - blockDataDestroy(((SStreamTrigger*)data)->pBlock); - taosFreeQitem(data); - } else if (type == STREAM_INPUT__DATA_BLOCK) { - taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock); - taosFreeQitem(data); - } else if (type == STREAM_INPUT__DATA_SUBMIT) { - ASSERT(pTask->isDataScan); - streamDataSubmitRefDec((SStreamDataSubmit*)data); - taosFreeQitem(data); - } streamQueueProcessSuccess(pTask->inputQueue); - return taosArrayInit(0, sizeof(SSDataBlock)); + pRes = taosArrayInit(0, sizeof(SSDataBlock)); + } + + int8_t type = ((SStreamQueueItem*)data)->type; + if (type == STREAM_INPUT__TRIGGER) { + blockDataDestroy(((SStreamTrigger*)data)->pBlock); + taosFreeQitem(data); + } else if (type == STREAM_INPUT__DATA_BLOCK) { + taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock); + taosFreeQitem(data); + } else if (type == STREAM_INPUT__DATA_SUBMIT) { + ASSERT(pTask->isDataScan); + streamDataSubmitRefDec((SStreamDataSubmit*)data); + taosFreeQitem(data); } } return pRes; @@ -125,14 +130,14 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) { pRes = streamExecForQall(pTask, pRes); if (pRes == NULL) goto FAIL; - taosArrayDestroy(pRes); + taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock); atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE); return 0; } else if (execStatus == TASK_EXEC_STATUS__CLOSING) { continue; } else if (execStatus == TASK_EXEC_STATUS__EXECUTING) { ASSERT(taosArrayGetSize(pRes) == 0); - taosArrayDestroy(pRes); + taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock); return 0; } else { ASSERT(0);