diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 0c38d43543db0df063381174528d886551ee8d8f..3afa91f2f982efd670f30d0360d65ac28cef07fd 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1186,6 +1186,7 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* } void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) { + pColumn->hasNull = false; if (IS_VAR_DATA_TYPE(pColumn->info.type)) { pColumn->varmeta.length = 0; if (pColumn->varmeta.offset != NULL) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1191b6a485655418b1d674f36a895a238a42acaf..16f094c85d3706964cb70e8cbaecb5d40bb51670 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -921,7 +921,7 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul void doSetOperatorCompleted(SOperatorInfo* pOperator); void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SColMatchInfo* pColMatchInfo, SFilterInfo* pFilterInfo); int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, - SSDataBlock* pBlock, const char* idStr); + SSDataBlock* pBlock, int32_t rows, const char* idStr); void cleanupAggSup(SAggSupporter* pAggSup); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 92e98d3eab205f8bb75fbdc28494e4d06c396193..bcd59e83f00f513682fdefbcbb20704efeca031d 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -152,25 +152,27 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { pInfo->indexOfBufferedRes = 0; } + SSDataBlock* pRes = pInfo->pRes; + if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) { SColMatchItem* pMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i); int32_t slotId = pMatchInfo->dstSlotId; SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId); - SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, slotId); + SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId); char* p = colDataGetData(pSrc, pInfo->indexOfBufferedRes); bool isNull = colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes); colDataAppend(pDst, 0, p, isNull); } - pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes); - pInfo->pRes->info.rows = 1; + pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes); + pRes->info.rows = 1; if (pInfo->pseudoExprSup.numOfExprs > 0) { SExprSupp* pSup = &pInfo->pseudoExprSup; - int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, + int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, pRes->info.rows, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; @@ -178,10 +180,9 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } } - pInfo->pRes->info.groupId = getTableGroupId(pTableList, pInfo->pRes->info.uid); - + pRes->info.groupId = getTableGroupId(pTableList, pRes->info.uid); pInfo->indexOfBufferedRes += 1; - return pInfo->pRes; + return pRes; } else { doSetOperatorCompleted(pOperator); return NULL; @@ -221,7 +222,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW); pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0); - code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, + code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, pInfo->pRes->info.rows, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 750b9f258b2a202afca872f674b444f3ed0ba93c..b3226f292a2c0fff39b306879affb3d16f0a226f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -344,11 +344,11 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, return true; } -static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { +static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, int32_t rows) { if (pTableScanInfo->pseudoSup.numOfExprs > 0) { SExprSupp* pSup = &pTableScanInfo->pseudoSup; - int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, + int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); @@ -426,7 +426,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca // NOTE: here the tag value only load for one row ensureBlockCapacity(pBlock, 1); - doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo); + doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1); pCost->skipBlocks += 1; return TSDB_CODE_SUCCESS; } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { @@ -437,7 +437,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); ensureBlockCapacity(pBlock, 1); - doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo); + doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1); return TSDB_CODE_SUCCESS; } else { qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo)); @@ -488,7 +488,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca ensureBlockCapacity(pBlock, pBlock->info.rows); relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); - doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo); + doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows); // restore the previous value pCost->totalRows -= pBlock->info.rows; @@ -528,27 +528,30 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction } int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, - SSDataBlock* pBlock, const char* idStr) { + SSDataBlock* pBlock, int32_t rows, const char* idStr) { // currently only the tbname pseudo column if (numOfPseudoExpr == 0) { return TSDB_CODE_SUCCESS; } + // backup the rows + int32_t backupRows = pBlock->info.rows; + pBlock->info.rows = rows; + SMetaReader mr = {0}; metaReaderInit(&mr, pHandle->meta, 0); int32_t code = metaGetTableEntryByUid(&mr, pBlock->info.uid); + metaReaderReleaseLock(&mr); + if (code != TSDB_CODE_SUCCESS) { qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr); metaReaderClear(&mr); return terrno; } - metaReaderReleaseLock(&mr); - for (int32_t j = 0; j < numOfPseudoExpr; ++j) { SExprInfo* pExpr = &pPseudoExpr[j]; - - int32_t dstSlotId = pExpr->base.resSchema.slotId; + int32_t dstSlotId = pExpr->base.resSchema.slotId; SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId); colInfoDataCleanup(pColInfoData, pBlock->info.rows); @@ -587,6 +590,9 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int } metaReaderClear(&mr); + + // restore the rows + pBlock->info.rows = backupRows; return TSDB_CODE_SUCCESS; } @@ -631,7 +637,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { blockDataCleanup(pBlock); - SDataBlockInfo* pBInfo = &pBlock->info; tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBInfo->rows, &pBInfo->uid, &pBInfo->window); @@ -1167,7 +1172,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU pBlock->info.rows = rows; relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); - doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo); + doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, rows); pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBInfo->uid); } @@ -1638,7 +1643,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock // currently only the tbname pseudo column if (pInfo->numOfPseudoExpr > 0) { int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, - GET_TASKID(pTaskInfo)); + pInfo->pRes->info.rows, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { blockDataFreeRes((SSDataBlock*)pBlock); T_LONG_JMP(pTaskInfo->env, code); @@ -1648,11 +1653,11 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock if (filter) { doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL); } + blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); blockDataFreeRes((SSDataBlock*)pBlock); calBlockTbName(&pInfo->tbnameCalSup, pInfo->pRes); - return 0; } @@ -4368,7 +4373,7 @@ static int32_t loadDataBlockFromOneTable2(SOperatorInfo* pOperator, STableMergeS // currently only the tbname pseudo column if (pTableScanInfo->pseudoSup.numOfExprs > 0) { int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo, - pTableScanInfo->pseudoSup.numOfExprs, pBlock, GET_TASKID(pTaskInfo)); + pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -4485,7 +4490,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc // currently only the tbname pseudo column if (pTableScanInfo->pseudoSup.numOfExprs > 0) { int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo, - pTableScanInfo->pseudoSup.numOfExprs, pBlock, GET_TASKID(pTaskInfo)); + pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 5dcef7cd1764a25c36054778f0921610eb4e6bd5..b9acd3608805beb68ad47797a3ce27702c3246e2 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -497,7 +497,7 @@ bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { return true; } -static FORCE_INLINE int32_t getNumOfElems(SqlFunctionCtx* pCtx) { +static int32_t getNumOfElems(SqlFunctionCtx* pCtx) { int32_t numOfElem = 0; /*