From d200fe5145fb821b3174b6af40f527b4c207bf51 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 6 Nov 2022 01:34:04 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/common/tdatablock.h | 3 ++ source/common/src/tdatablock.c | 25 +++++++------- source/common/src/trow.c | 4 +-- source/dnode/mnode/impl/src/mndShow.c | 15 ++++---- source/dnode/vnode/src/tsdb/tsdbRead.c | 1 - source/libs/executor/src/groupoperator.c | 5 ++- source/libs/executor/src/scanoperator.c | 44 ++++++++---------------- source/libs/executor/src/sortoperator.c | 10 +++--- 8 files changed, 46 insertions(+), 61 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 72232610d3..fdbb2c5123 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -137,6 +137,9 @@ static FORCE_INLINE void colDataAppendNNULL(SColumnInfoData* pColumnInfoData, ui for (int32_t i = start; i < start + nRows; ++i) { colDataSetNull_f(pColumnInfoData->nullbitmap, i); } + + int32_t bytes = pColumnInfoData->info.bytes; + memset(pColumnInfoData->pData + start * bytes, 0, nRows * bytes); } pColumnInfoData->hasNull = true; diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 6a492a30c6..df0b9d2bf7 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1126,26 +1126,27 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF } void blockDataCleanup(SSDataBlock* pDataBlock) { - pDataBlock->info.rows = 0; - pDataBlock->info.groupId = 0; + SDataBlockInfo* pInfo = &pDataBlock->info; - pDataBlock->info.window.ekey = 0; - pDataBlock->info.window.skey = 0; + pInfo->rows = 0; + pInfo->groupId = 0; + pInfo->window.ekey = 0; + pInfo->window.skey = 0; - if (pDataBlock->info.capacity == 0) { + if (pInfo->capacity == 0) { return; } size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); - colInfoDataCleanup(p, pDataBlock->info.capacity); + colInfoDataCleanup(p, pInfo->capacity); } } static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows, bool clearPayload) { ASSERT(numOfRows > 0 && pBlockInfo->capacity >= pBlockInfo->rows); - if (numOfRows < pBlockInfo->capacity) { + if (numOfRows <= pBlockInfo->capacity) { return TSDB_CODE_SUCCESS; } @@ -1193,6 +1194,7 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) { pColumn->hasNull = false; + if (IS_VAR_DATA_TYPE(pColumn->info.type)) { pColumn->varmeta.length = 0; if (pColumn->varmeta.offset != NULL) { @@ -1212,23 +1214,20 @@ int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) { int32_t code = 0; - if (numOfRows == 0) { + if (numOfRows == 0 || numOfRows <= pDataBlock->info.capacity) { return TSDB_CODE_SUCCESS; } - if (pDataBlock->info.capacity < numOfRows) { - pDataBlock->info.capacity = numOfRows; - } - size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); - code = doEnsureCapacity(p, &pDataBlock->info, numOfRows, false); + code = doEnsureCapacity(p, &pDataBlock->info, numOfRows, true); if (code) { return code; } } + pDataBlock->info.capacity = numOfRows; return TSDB_CODE_SUCCESS; } diff --git a/source/common/src/trow.c b/source/common/src/trow.c index e4818aaa87..ac4b5f86de 100644 --- a/source/common/src/trow.c +++ b/source/common/src/trow.c @@ -1065,8 +1065,8 @@ void tdSTSRowIterInit(STSRowIter *pIter, STSchema *pSchema) { void tTSRowGetVal(STSRow *pRow, STSchema *pTSchema, int16_t iCol, SColVal *pColVal) { STColumn *pTColumn = &pTSchema->columns[iCol]; - SCellVal cv; - SValue value; + SCellVal cv = {0}; + SValue value = {0}; ASSERT((pTColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) || (iCol > 0)); diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index 8b4cacf8d6..637a3818e6 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -240,25 +240,22 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) { return -1; } - int32_t numOfCols = pShow->pMeta->numOfColumns; - SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); - pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); + int32_t numOfCols = pShow->pMeta->numOfColumns; + SSDataBlock *pBlock = createDataBlock(); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData idata = {0}; - SSchema *p = &pShow->pMeta->pSchemas[i]; + + SSchema *p = &pShow->pMeta->pSchemas[i]; idata.info.bytes = p->bytes; idata.info.type = p->type; idata.info.colId = p->colId; - - taosArrayPush(pBlock->pDataBlock, &idata); - if (IS_VAR_DATA_TYPE(p->type)) { - pBlock->info.hasVarCol = true; - } + blockDataAppendColInfo(pBlock, &idata); } blockDataEnsureCapacity(pBlock, rowsToRead); + if (mndCheckRetrieveFinished(pShow)) { mDebug("show:0x%" PRIx64 ", read finished, numOfRows:%d", pShow->id, pShow->numOfRows); rowsRead = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 4099dafa26..bfde5b3076 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -544,7 +544,6 @@ static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) taosMemoryFree(pResBlock); return NULL; } - return pResBlock; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index e07a3475e0..32fe1ba849 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -336,8 +336,6 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SGroupbyOperatorInfo* pInfo = pOperator->info; - SSDataBlock* pRes = pInfo->binfo.pRes; - if (pOperator->status == OP_RES_TO_RETURN) { return buildGroupResultDataBlock(pOperator); } @@ -390,7 +388,6 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { } } #endif - blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0); pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; @@ -422,6 +419,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* } initResultSizeInfo(&pOperator->resultInfo, 4096); + blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index be912c9f11..16fbe7d1db 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -373,17 +373,6 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo } } -static void ensureBlockCapacity(SSDataBlock* pBlock, int32_t capacity) { - // keep the value of rows temporarily - int32_t rows = pBlock->info.rows; - - pBlock->info.rows = 0; - blockDataEnsureCapacity(pBlock, capacity); - - // restore the rows number - pBlock->info.rows = rows; -} - static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -414,11 +403,6 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) { qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); - - if (pTableScanInfo->pseudoSup.numOfExprs > 0) { - ensureBlockCapacity(pBlock, pBlock->info.rows); - } - doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1); pCost->skipBlocks += 1; return TSDB_CODE_SUCCESS; @@ -429,10 +413,6 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); - if (pTableScanInfo->pseudoSup.numOfExprs > 0) { - ensureBlockCapacity(pBlock, pBlock->info.rows); - } - doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1); return TSDB_CODE_SUCCESS; } else { @@ -482,7 +462,6 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca return terrno; } - ensureBlockCapacity(pBlock, pBlock->info.rows); relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows); @@ -636,9 +615,13 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { } blockDataCleanup(pBlock); - SDataBlockInfo* pBInfo = &pBlock->info; - tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBInfo->rows, &pBInfo->uid, &pBInfo->window); + + int32_t rows = 0; + tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &rows, &pBInfo->uid, &pBInfo->window); + + blockDataEnsureCapacity(pBlock, rows); + pBInfo->rows = rows; ASSERT(pBInfo->uid != 0); pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid); @@ -686,7 +669,6 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) { while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) { SSDataBlock* p = doTableScanImpl(pOperator); if (p != NULL) { - ASSERT(p->info.uid != 0); return p; } @@ -874,9 +856,12 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->sample.seed = taosGetTimestampSec(); pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; + + initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pResBlock = createResDataBlock(pDescNode); - pInfo->pFilterNode = pScanNode->node.pConditions; + blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); + pInfo->pFilterNode = pScanNode->node.pConditions; if (pInfo->pFilterNode != NULL) { code = filterInitFromNode((SNode*)pInfo->pFilterNode, &pOperator->exprSupp.pFilterInfo, 0); if (code != TSDB_CODE_SUCCESS) { @@ -1002,12 +987,10 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo); varDataSetLen(p, len); - blockDataEnsureCapacity(pBlock, 1); colDataAppend(pColInfo, 0, p, false); taosMemoryFree(p); pBlock->info.rows = 1; - pOperator->status = OP_EXEC_DONE; return pBlock; } @@ -1073,7 +1056,9 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi pInfo->readHandle = *readHandle; pInfo->uid = pBlockScanNode->suid; + pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc); + blockDataEnsureCapacity(pInfo->pResBlock, 1); int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols); @@ -4601,7 +4586,6 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; blockDataCleanup(pResBlock); - blockDataEnsureCapacity(pResBlock, capacity); while (1) { STupleHandle* pTupleHandle = tsortNextTuple(pHandle); @@ -4761,7 +4745,10 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->tableListInfo = pTableListInfo; pInfo->scanFlag = MAIN_SCAN; + initResultSizeInfo(&pOperator->resultInfo, 1024); pInfo->pResBlock = createResDataBlock(pDescNode); + blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); + pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam)); pInfo->pSortInfo = generateSortByTsInfo(pInfo->matchInfo.pList, pInfo->cond.order); @@ -4778,7 +4765,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pOperator->info = pInfo; pOperator->exprSupp.numOfExprs = numOfCols; pOperator->pTaskInfo = pTaskInfo; - initResultSizeInfo(&pOperator->resultInfo, 1024); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo, getTableMergeScanExplainExecInfo); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index b2c926f1e9..0427d78b20 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -579,12 +579,13 @@ int32_t doOpenMultiwayMergeOperator(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } -SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, - SArray* pColMatchInfo, SOperatorInfo* pOperator) { +SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, SArray* pColMatchInfo, + SOperatorInfo* pOperator) { SMultiwayMergeOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; blockDataCleanup(pDataBlock); + int32_t capacity = pDataBlock->info.capacity; SSDataBlock* p = tsortGetSortedDataBlock(pHandle); if (p == NULL) { @@ -679,8 +680,7 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } - SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, - pOperator->resultInfo.capacity, pInfo->matchInfo.pList, pOperator); + SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->matchInfo.pList, pOperator); if (pBlock != NULL) { pOperator->resultInfo.totalRows += pBlock->info.rows; } else { @@ -742,7 +742,9 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc); + initResultSizeInfo(&pOperator->resultInfo, 1024); + blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); pInfo->groupSort = pMergePhyNode->groupSort; pInfo->pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys); -- GitLab