diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 8fd5e7f41e7aa9531d7d7382f7ed15de9be82d78..07686893db5412e4bb3a512c102a1dd39af2ecbb 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -113,7 +113,7 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs); -SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset); +SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset); void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols); void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow); @@ -121,6 +121,6 @@ SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode); SColumn extractColumnFromColumnNode(SColumnNode* pColNode); int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode); -void cleanupQueryTableDataCond(SQueryTableDataCond* pCond); +void cleanupQueryTableDataCond(SQueryTableDataCond* pCond); #endif // TDENGINE_QUERYUTIL_H diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 5efb448a3863bcdd6eaec6a3290c133501205291..b2fc743818e83ebc307d39cc899a36f291107de8 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -83,7 +83,6 @@ typedef struct SResultInfo { // TODO refactor typedef struct STableQueryInfo { TSKEY lastKey; // last check ts, todo remove it later SResultRowPosition pos; // current active time window -// SVariant tag; } STableQueryInfo; typedef struct SLimit { @@ -156,8 +155,6 @@ typedef struct STaskAttr { } STaskAttr; struct SOperatorInfo; -//struct SAggSupporter; -//struct SOptrBasicInfo; typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length); typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result); @@ -199,25 +196,15 @@ typedef struct STaskRuntimeEnv { STaskAttr* pQueryAttr; uint32_t status; // query status uint8_t scanFlag; // denotes reversed scan of data or not - SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file SHashObj* pResultRowHashTable; // quick locate the window object for each result SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not - SArray* pResultRowArrayList; // The array list that contains the Result rows char* keyBuf; // window key buffer - // The window result objects pool, all the resultRow Objects are allocated and managed by this object. - char** prevRow; - STSBuf* pTsBuf; // timestamp filter list - STSCursor cur; - - char* tagVal; // tag value of current data block -// STableGroupInfo tableqinfoGroupInfo; // this is a table list + STSCursor cur; + char* tagVal; // tag value of current data block struct SOperatorInfo* proot; - SGroupResInfo groupResInfo; - int64_t currentOffset; // dynamic offset value - + int64_t currentOffset; // dynamic offset value STableQueryInfo* current; SResultInfo resultInfo; - struct SUdfInfo* pUdfInfo; } STaskRuntimeEnv; enum { @@ -243,7 +230,7 @@ typedef struct SOperatorInfo { bool blocking; // block operator or not uint8_t status; // denote if current operator is completed int32_t numOfExprs; // number of columns of the current operator results - char* name; // name, used to show the query execution plan + char* name; // name, for debug purpose void* info; // extension attribution SExprInfo* pExpr; SExecTaskInfo* pTaskInfo; @@ -308,6 +295,13 @@ typedef struct SSampleExecInfo { uint32_t seed; // random seed value } SSampleExecInfo; +typedef struct SExecSupp { + SExprInfo* pExprInfo; + int32_t numOfExprs; // the number of scalar expression in group operator + SqlFunctionCtx* pCtx; + int32_t* rowEntryInfoOffset; // offset value for each row result cell info +} SExecSupp; + typedef struct STableScanInfo { void* dataReader; SReadHandle readHandle; @@ -320,17 +314,13 @@ typedef struct STableScanInfo { SNode* pFilterNode; // filter info, which is push down by optimizer SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context SResultRowInfo* pResultRowInfo; - int32_t* rowCellInfoOffset; + int32_t* rowEntryInfoOffset; SExprInfo* pExpr; SSDataBlock* pResBlock; SArray* pColMatchInfo; int32_t numOfOutput; - SExprInfo* pPseudoExpr; - int32_t numOfPseudoExpr; - SqlFunctionCtx* pPseudoCtx; -// int32_t* rowCellInfoOffset; - + SExecSupp pseudoSup; SQueryTableDataCond cond; int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t dataBlockLoadFlag; @@ -436,19 +426,20 @@ typedef struct SBlockDistInfo { void* pHandle; } SBlockDistInfo; +// todo remove this typedef struct SOptrBasicInfo { SResultRowInfo resultRowInfo; - int32_t* rowCellInfoOffset; // offset value for each row result cell info + int32_t* rowEntryInfoOffset; // offset value for each row result cell info SqlFunctionCtx* pCtx; SSDataBlock* pRes; } SOptrBasicInfo; -// TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset +// TODO move the resultrowsiz together with SOptrBasicInfo:rowEntryInfoOffset typedef struct SAggSupporter { SHashObj* pResultRowHashTable; // quick locate the window object for each result char* keyBuf; // window key buffer SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file - int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row + int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row, todo remove it } SAggSupporter; typedef struct STimeWindowSupp { @@ -503,7 +494,7 @@ typedef struct SAggOperatorInfo { SExprInfo *pScalarExprInfo; int32_t numOfScalarExpr; // the number of scalar expression before the aggregate function can be applied SqlFunctionCtx *pScalarCtx; // scalar function requried sql function struct. - int32_t *rowCellInfoOffset; // offset value for each row result cell info + int32_t *rowEntryInfoOffset; // offset value for each row result cell info } SAggOperatorInfo; typedef struct SProjectOperatorInfo { @@ -528,11 +519,7 @@ typedef struct SIndefOperatorInfo { SOptrBasicInfo binfo; SAggSupporter aggSup; SArray* pPseudoColInfo; - - SExprInfo* pScalarExpr; - int32_t numOfScalarExpr; - SqlFunctionCtx* pScalarCtx; - int32_t* rowCellInfoOffset; + SExecSupp scalarSup; } SIndefOperatorInfo; typedef struct SFillOperatorInfo { @@ -544,13 +531,6 @@ typedef struct SFillOperatorInfo { bool multigroupResult; } SFillOperatorInfo; -typedef struct SScalarSupp { - SExprInfo* pScalarExprInfo; - int32_t numOfScalarExpr; // the number of scalar expression in group operator - SqlFunctionCtx* pScalarFuncCtx; - int32_t* rowCellInfoOffset; // offset value for each row result cell info -} SScalarSupp; - typedef struct SGroupbyOperatorInfo { // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; @@ -563,7 +543,7 @@ typedef struct SGroupbyOperatorInfo { char* keyBuf; // group by keys for hash int32_t groupKeyLen; // total group by column width SGroupResInfo groupResInfo; - SScalarSupp scalarSup; + SExecSupp scalarSup; } SGroupbyOperatorInfo; typedef struct SDataGroupInfo { @@ -587,7 +567,7 @@ typedef struct SPartitionOperatorInfo { void* pGroupIter; // group iterator int32_t pageIndex; // page index of current group SSDataBlock* pUpdateRes; - SScalarSupp scalarSupp; + SExecSupp scalarSup; } SPartitionOperatorInfo; typedef struct SWindowRowsSup { @@ -764,9 +744,11 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId); +void cleanupExecSupp(SExecSupp* pSupp); + SSDataBlock* loadNextDataBlock(void* param); -void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); +void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset); SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData, int16_t bytes, bool masterscan, uint64_t groupId, @@ -885,12 +867,13 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order); int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); -int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, - SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t size); +int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput, + int32_t size); SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize); SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, int64_t gap, int32_t* pIndex); -int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows, - int32_t start, int64_t gap, SHashObj* pStDeleted); +int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows, int32_t start, int64_t gap, + SHashObj* pStDeleted); + bool functionNeedToExecute(SqlFunctionCtx* pCtx); int32_t compareTimeWindow(const void* p1, const void* p2, const void* param); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index c8d92520133faa81423a35cca1a837c87d6b9c0b..99139be409ace233b565013a939e0c3cf066509c 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -42,21 +42,6 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) { } } -void resetResultRowInfo(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) { - for (int32_t i = 0; i < pResultRowInfo->size; ++i) { -// SResultRow *pWindowRes = pResultRowInfo->pResult[i]; -// clearResultRow(pRuntimeEnv, pWindowRes); - - int32_t groupIndex = 0; - int64_t uid = 0; - - SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, &groupIndex, sizeof(groupIndex), uid); - taosHashRemove(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(groupIndex))); - } - - pResultRowInfo->size = 0; -} - void closeAllResultRows(SResultRowInfo *pResultRowInfo) { // do nothing } @@ -518,14 +503,14 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu return TSDB_CODE_SUCCESS; } -SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset) { +SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset) { SqlFunctionCtx* pFuncCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx)); if (pFuncCtx == NULL) { return NULL; } - *rowCellInfoOffset = taosMemoryCalloc(numOfOutput, sizeof(int32_t)); - if (*rowCellInfoOffset == 0) { + *rowEntryInfoOffset = taosMemoryCalloc(numOfOutput, sizeof(int32_t)); + if (*rowEntryInfoOffset == 0) { taosMemoryFreeClear(pFuncCtx); return NULL; } @@ -584,8 +569,8 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, } for (int32_t i = 1; i < numOfOutput; ++i) { - (*rowCellInfoOffset)[i] = - (int32_t)((*rowCellInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pFuncCtx[i - 1].resDataInfo.interBufSize); + (*rowEntryInfoOffset)[i] = + (int32_t)((*rowEntryInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pFuncCtx[i - 1].resDataInfo.interBufSize); } setSelectValueColumnInfo(pFuncCtx, numOfOutput); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7ae9f543613086ce08e2d4bfac5e7ddbcc4d640a..976e9577eb008e26104f6474e1e19ce05c60cef3 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -147,32 +147,6 @@ static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo); static void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* pTaskInfo); -SArray* getOrderCheckColumns(STaskAttr* pQuery); - -typedef struct SRowCompSupporter { - STaskRuntimeEnv* pRuntimeEnv; - int16_t dataOffset; - __compar_fn_t comFunc; -} SRowCompSupporter; - -static int compareRowData(const void* a, const void* b, const void* userData) { - const SResultRow* pRow1 = (const SResultRow*)a; - const SResultRow* pRow2 = (const SResultRow*)b; - - SRowCompSupporter* supporter = (SRowCompSupporter*)userData; - STaskRuntimeEnv* pRuntimeEnv = supporter->pRuntimeEnv; - - SFilePage* page1 = getBufPage(pRuntimeEnv->pResultBuf, pRow1->pageId); - SFilePage* page2 = getBufPage(pRuntimeEnv->pResultBuf, pRow2->pageId); - - int16_t offset = supporter->dataOffset; - return 0; - // char* in1 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page1, pRow1->offset, offset); - // char* in2 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page2, pRow2->offset, offset); - - // return (in1 != NULL && in2 != NULL) ? supporter->comFunc(in1, in2) : 0; -} - // setup the output buffer for each operator static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) { if (TSDB_COL_IS_TAG(pColumn->flag) || TSDB_COL_IS_UD_COL(pColumn->flag) || @@ -410,7 +384,7 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes static bool chkWindowOutputBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan, SResultRow** pResult, int64_t groupId, SqlFunctionCtx* pCtx, - int32_t numOfOutput, int32_t* rowCellInfoOffset) { + int32_t numOfOutput, int32_t* rowEntryInfoOffset) { assert(win->skey <= win->ekey); return chkResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE, masterscan, groupId); } @@ -1248,17 +1222,17 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc if (pQueryAttr->pointInterpQuery) { needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, - pTableScanInfo->rowCellInfoOffset); + pTableScanInfo->rowEntryInfoOffset); } else { if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, - pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) { + pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } } } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx, - pTableScanInfo->rowCellInfoOffset, pTableScanInfo->numOfOutput, + pTableScanInfo->rowEntryInfoOffset, pTableScanInfo->numOfOutput, pRuntimeEnv->current->groupIndex); } @@ -1303,7 +1277,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr); if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, - pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) { + pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } } @@ -1389,7 +1363,7 @@ void initResultRow(SResultRow* pResultRow) { void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, int32_t numOfExprs, SExecTaskInfo* pTaskInfo) { SqlFunctionCtx* pCtx = pInfo->pCtx; - int32_t* rowCellInfoOffset = pInfo->rowCellInfoOffset; + int32_t* rowEntryInfoOffset = pInfo->rowEntryInfoOffset; SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo; initResultRowInfo(pResultRowInfo); @@ -1400,7 +1374,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t pTaskInfo, false, pSup); for (int32_t i = 0; i < numOfExprs; ++i) { - struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowCellInfoOffset); + struct SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, rowEntryInfoOffset); cleanupResultRowEntry(pEntry); pCtx[i].resultInfo = pEntry; @@ -1441,9 +1415,9 @@ void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) { // cleanupResultRowInfo(&pTableQueryInfo->resInfo); } -void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) { +void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) { for (int32_t i = 0; i < numOfOutput; ++i) { - pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowCellInfoOffset); + pCtx[i].resultInfo = getResultEntryInfo(pResult, i, rowEntryInfoOffset); struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo; if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) { @@ -1543,7 +1517,7 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, u SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo; SqlFunctionCtx* pCtx = pAggInfo->binfo.pCtx; - int32_t* rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset; + int32_t* rowEntryInfoOffset = pAggInfo->binfo.rowEntryInfoOffset; SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup); @@ -1561,7 +1535,7 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, u } } - setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowCellInfoOffset); + setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); } void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* pTaskInfo, SAggOperatorInfo* pAggInfo) { @@ -1725,7 +1699,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG int32_t numOfExprs = pOperator->numOfExprs; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - int32_t* rowCellOffset = pbInfo->rowCellInfoOffset; + int32_t* rowCellOffset = pbInfo->rowEntryInfoOffset; SSDataBlock* pBlock = pbInfo->pRes; SqlFunctionCtx* pCtx = pbInfo->pCtx; @@ -1740,7 +1714,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG } static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo, - int32_t* rowCellInfoOffset) { + int32_t* rowEntryInfoOffset) { // update the number of result for each, only update the number of rows for the corresponding window result. // if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) { // return; @@ -1755,7 +1729,7 @@ static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutpu continue; } - SResultRowEntryInfo* pCell = getResultEntryInfo(pResult, j, rowCellInfoOffset); + SResultRowEntryInfo* pCell = getResultEntryInfo(pResult, j, rowEntryInfoOffset); pResult->numOfRows = (uint16_t)(TMAX(pResult->numOfRows, pCell->numOfRes)); } } @@ -2909,7 +2883,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t goto _error; } - pInfo->binfo.pCtx = createSqlFunctionCtx(pExprInfo, num, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pCtx = createSqlFunctionCtx(pExprInfo, num, &pInfo->binfo.rowEntryInfoOffset); initResultRowInfo(&pInfo->binfo.resultRowInfo); if (pInfo->binfo.pCtx == NULL || pInfo->binfo.pRes == NULL) { @@ -3561,7 +3535,7 @@ void cleanupAggSup(SAggSupporter* pAggSup) { int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey) { - pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset); + pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowEntryInfoOffset); pBasicInfo->pRes = pResultBlock; doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, keyBufSize, pkey); @@ -3607,7 +3581,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pInfo->pScalarExprInfo = pScalarExprInfo; pInfo->numOfScalarExpr = numOfScalarExpr; if (pInfo->pScalarExprInfo != NULL) { - pInfo->pScalarCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->rowCellInfoOffset); + pInfo->pScalarCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->rowEntryInfoOffset); } pOperator->name = "TableAggregate"; @@ -3659,7 +3633,7 @@ void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) { assert(pInfo != NULL); destroySqlFunctionCtx(pInfo->pCtx, numOfOutput); - taosMemoryFreeClear(pInfo->rowCellInfoOffset); + taosMemoryFreeClear(pInfo->rowEntryInfoOffset); cleanupResultRowInfo(&pInfo->resultRowInfo); pInfo->pRes = blockDataDestroy(pInfo->pRes); @@ -3692,17 +3666,20 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { taosArrayDestroy(pInfo->pPseudoColInfo); } +void cleanupExecSupp(SExecSupp* pSupp) { + destroySqlFunctionCtx(pSupp->pCtx, pSupp->numOfExprs); + destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs); + + taosMemoryFree(pSupp->rowEntryInfoOffset); +} + static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) { SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*)param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); taosArrayDestroy(pInfo->pPseudoColInfo); cleanupAggSup(&pInfo->aggSup); - - destroySqlFunctionCtx(pInfo->pScalarCtx, numOfOutput); - destroyExprInfo(pInfo->pScalarExpr, pInfo->numOfScalarExpr); - - taosMemoryFree(pInfo->rowCellInfoOffset); + cleanupExecSupp(&pInfo->scalarSup); } void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { @@ -3829,9 +3806,10 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) { } // there is an scalar expression that needs to be calculated before apply the group aggregation. - if (pIndefInfo->pScalarExpr != NULL) { - code = projectApplyFunctions(pIndefInfo->pScalarExpr, pBlock, pBlock, pIndefInfo->pScalarCtx, - pIndefInfo->numOfScalarExpr, pIndefInfo->pPseudoColInfo); + SExecSupp* pScalarSup = &pIndefInfo->scalarSup; + if (pScalarSup->pExprInfo != NULL) { + code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs, + pIndefInfo->pPseudoColInfo); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, code); } @@ -3872,8 +3850,9 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy int32_t numOfScalarExpr = 0; if (pPhyNode->pExprs != NULL) { - pInfo->pScalarExpr = createExprInfo(pPhyNode->pExprs, NULL, &numOfScalarExpr); - pInfo->pScalarCtx = createSqlFunctionCtx(pInfo->pScalarExpr, numOfScalarExpr, &pInfo->rowCellInfoOffset); + SExecSupp* pSup = &pInfo->scalarSup; + pSup->pExprInfo = createExprInfo(pPhyNode->pExprs, NULL, &pSup->numOfExprs); + pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, numOfScalarExpr, &pSup->rowEntryInfoOffset); } SSDataBlock* pResBlock = createResDataBlock(pPhyNode->node.pOutputDataBlockDesc); @@ -3893,7 +3872,6 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr, pTaskInfo); pInfo->binfo.pRes = pResBlock; - pInfo->numOfScalarExpr = numOfScalarExpr; pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfExpr); pOperator->name = "IndefinitOperator"; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 75ba1d5d7b2d7e60e9f15c8ded2a291d06bc7689..c46c1dc3ef4536dff556e0f27d5020e1037ed430 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -37,6 +37,7 @@ static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) { taosMemoryFreeClear(pInfo->keyBuf); taosArrayDestroy(pInfo->pGroupCols); taosArrayDestroy(pInfo->pGroupColVals); + cleanupExecSupp(&pInfo->scalarSup); } static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) { @@ -322,8 +323,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, scanFlag, true); // there is an scalar expression that needs to be calculated right before apply the group aggregation. - if (pInfo->scalarSup.pScalarExprInfo != NULL) { - pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pScalarExprInfo, pBlock, pBlock, pInfo->scalarSup.pScalarFuncCtx, pInfo->scalarSup.numOfScalarExpr, NULL); + if (pInfo->scalarSup.pExprInfo != NULL) { + pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx, pInfo->scalarSup.numOfExprs, NULL); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, pTaskInfo->code); } @@ -386,9 +387,9 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pInfo->pGroupCols = pGroupColList; pInfo->pCondition = pCondition; - pInfo->scalarSup.pScalarExprInfo = pScalarExprInfo; - pInfo->scalarSup.numOfScalarExpr = numOfScalarExpr; - pInfo->scalarSup.pScalarFuncCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->scalarSup.rowCellInfoOffset); + pInfo->scalarSup.pExprInfo = pScalarExprInfo; + pInfo->scalarSup.numOfExprs = numOfScalarExpr; + pInfo->scalarSup.pCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->scalarSup.rowEntryInfoOffset); int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList); if (code != TSDB_CODE_SUCCESS) { @@ -645,8 +646,8 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { } // there is an scalar expression that needs to be calculated right before apply the group aggregation. - if (pInfo->scalarSupp.pScalarExprInfo != NULL) { - pTaskInfo->code = projectApplyFunctions(pInfo->scalarSupp.pScalarExprInfo, pBlock, pBlock, pInfo->scalarSupp.pScalarFuncCtx, pInfo->scalarSupp.numOfScalarExpr, NULL); + if (pInfo->scalarSup.pExprInfo != NULL) { + pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx, pInfo->scalarSup.numOfExprs, NULL); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, pTaskInfo->code); } @@ -666,14 +667,18 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) { SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); taosArrayDestroy(pInfo->pGroupCols); + for(int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++){ SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->pGroupColVals, i); taosMemoryFree(key.pData); } + taosArrayDestroy(pInfo->pGroupColVals); taosMemoryFree(pInfo->keyBuf); taosHashCleanup(pInfo->pGroupSet); taosMemoryFree(pInfo->columnOffset); + + cleanupExecSupp(&pInfo->scalarSup); } SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) { @@ -691,10 +696,10 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys); if (pPartNode->pExprs != NULL) { - pInfo->scalarSupp.numOfScalarExpr = 0; - pInfo->scalarSupp.pScalarExprInfo = createExprInfo(pPartNode->pExprs, NULL, &pInfo->scalarSupp.numOfScalarExpr); - pInfo->scalarSupp.pScalarFuncCtx = createSqlFunctionCtx( - pInfo->scalarSupp.pScalarExprInfo, pInfo->scalarSupp.numOfScalarExpr, &pInfo->scalarSupp.rowCellInfoOffset); + pInfo->scalarSup.numOfExprs = 0; + pInfo->scalarSup.pExprInfo = createExprInfo(pPartNode->pExprs, NULL, &pInfo->scalarSup.numOfExprs); + pInfo->scalarSup.pCtx = createSqlFunctionCtx( + pInfo->scalarSup.pExprInfo, pInfo->scalarSup.numOfExprs, &pInfo->scalarSup.rowEntryInfoOffset); } _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); @@ -752,6 +757,6 @@ int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup); assert(pResultRow != NULL); - setResultRowInitCtx(pResultRow, pCtx, numOfCols, binfo->rowCellInfoOffset); + setResultRowInitCtx(pResultRow, pCtx, numOfCols, binfo->rowEntryInfoOffset); return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 81fd442c57224e2e8219396423875ca7c0244888..73d6421ca3d16c45ce55688155a7af3e53501520 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -261,9 +261,9 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols); // currently only the tbname pseudo column - if (pTableScanInfo->numOfPseudoExpr > 0) { - addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pPseudoExpr, pTableScanInfo->numOfPseudoExpr, - pBlock); + if (pTableScanInfo->pseudoSup.numOfExprs > 0) { + SExecSupp* pSup = &pTableScanInfo->pseudoSup; + addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock); } int64_t st = taosGetTimestampMs(); @@ -538,8 +538,9 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, } if (pTableScanNode->scan.pScanPseudoCols != NULL) { - pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr); - pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowCellInfoOffset); + SExecSupp* pSup = &pInfo->pseudoSup; + pSup->pExprInfo = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pSup->numOfExprs); + pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset); } pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; @@ -1869,7 +1870,7 @@ typedef struct STableMergeScanInfo { SNode* pFilterNode; // filter info, which is push down by optimizer SqlFunctionCtx* pCtx; // which belongs to the direct upstream operator operator query context SResultRowInfo* pResultRowInfo; - int32_t* rowCellInfoOffset; + int32_t* rowEntryInfoOffset; SExprInfo* pExpr; SSDataBlock* pResBlock; SArray* pColMatchInfo; @@ -1878,7 +1879,7 @@ typedef struct STableMergeScanInfo { SExprInfo* pPseudoExpr; int32_t numOfPseudoExpr; SqlFunctionCtx* pPseudoCtx; - // int32_t* rowCellInfoOffset; + // int32_t* rowEntryInfoOffset; SQueryTableDataCond cond; int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan @@ -2257,7 +2258,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN if (pTableScanNode->scan.pScanPseudoCols != NULL) { pInfo->pPseudoExpr = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->numOfPseudoExpr); - pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowCellInfoOffset); + pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowEntryInfoOffset); } pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 9821e87249ae4603ac3f9c4c49509a1b7331f4a8..a16bceabfc7cb1a9226b2938edbf60562e590ebb 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -39,7 +39,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* SArray* pColMatchColInfo = extractColMatchInfo(pSortPhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); - pInfo->binfo.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pInfo->binfo.rowEntryInfoOffset); pInfo->binfo.pRes = pResBlock; initResultSizeInfo(pOperator, 1024); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 6e9cd0453e8293c7ca56baa3954a4b52a778c6c7..0c269b11fb177f25f7f57e7e2617b2eccb3bc88f 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -103,7 +103,7 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx, - int32_t numOfOutput, int32_t* rowCellInfoOffset, SAggSupporter* pAggSup, + int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) { assert(win->skey <= win->ekey); SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE, @@ -118,7 +118,7 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo pResultRow->win = (*win); *pResult = pResultRow; - setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowCellInfoOffset); + setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); return TSDB_CODE_SUCCESS; } @@ -656,7 +656,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num STimeWindow w = pr->win; int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pInfo->binfo.pCtx, - numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); + numOfOutput, pInfo->binfo.rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -812,7 +812,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pInfo->binfo.pCtx, - numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); + numOfOutput, pInfo->binfo.rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -836,7 +836,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul // restore current time window ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pInfo->binfo.pCtx, - numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); + numOfOutput, pInfo->binfo.rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -861,7 +861,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul // null data, failed to allocate more memory buffer int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, - pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, + pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1042,7 +1042,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI pRowSup->win.ekey = pRowSup->win.skey; int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pInfo->binfo.pCtx, - numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); + numOfOutput, pInfo->binfo.rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } @@ -1068,7 +1068,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI pRowSup->win.ekey = tsList[pBlock->info.rows - 1]; int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid, pInfo->binfo.pCtx, - numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); + numOfOutput, pInfo->binfo.rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } @@ -1166,7 +1166,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { // todo merged with the build group result. static void finalizeUpdatedResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SArray* pUpdateList, - int32_t* rowCellInfoOffset) { + int32_t* rowEntryInfoOffset) { size_t num = taosArrayGetSize(pUpdateList); for (int32_t i = 0; i < num; ++i) { @@ -1176,7 +1176,7 @@ static void finalizeUpdatedResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SArr SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->pos.offset); for (int32_t j = 0; j < numOfOutput; ++j) { - SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, j, rowCellInfoOffset); + SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, j, rowEntryInfoOffset); if (pRow->numOfRows < pEntry->numOfRes) { pRow->numOfRows = pEntry->numOfRes; } @@ -1199,7 +1199,7 @@ void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SOptrB SResultRow* pResult = getResultRowByPos(pResultBuf, p1); SqlFunctionCtx* pCtx = pBinfo->pCtx; for (int32_t i = 0; i < numOfOutput; ++i) { - pCtx[i].resultInfo = getResultEntryInfo(pResult, i, pBinfo->rowCellInfoOffset); + pCtx[i].resultInfo = getResultEntryInfo(pResult, i, pBinfo->rowEntryInfoOffset); struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo; if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) { continue; @@ -1338,7 +1338,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated); - finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); + finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowEntryInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); @@ -1603,7 +1603,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator pRowSup->win.ekey = pRowSup->win.skey; int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pInfo->binfo.pCtx, - numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); + numOfOutput, pInfo->binfo.rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } @@ -1623,7 +1623,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator pRowSup->win.ekey = tsList[pBlock->info.rows - 1]; int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid, pInfo->binfo.pCtx, - numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); + numOfOutput, pInfo->binfo.rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } @@ -2068,14 +2068,14 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SArra STimeWindow* pParentWin = taosArrayGet(pWinArray, i); SResultRow* pCurResult = NULL; setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, pParentWin, true, &pCurResult, 0, pInfo->binfo.pCtx, - numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); + numOfOutput, pInfo->binfo.rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren); for (int32_t j = 0; j < numOfChildren; j++) { SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, j); SIntervalAggOperatorInfo* pChInfo = pChildOp->info; SResultRow* pChResult = NULL; setTimeWindowOutputBuf(&pChInfo->binfo.resultRowInfo, pParentWin, true, &pChResult, 0, pChInfo->binfo.pCtx, - pChildOp->numOfExprs, pChInfo->binfo.rowCellInfoOffset, &pChInfo->aggSup, pTaskInfo); + pChildOp->numOfExprs, pChInfo->binfo.rowEntryInfoOffset, &pChInfo->aggSup, pTaskInfo); compactFunctions(pInfo->binfo.pCtx, pChInfo->binfo.pCtx, numOfOutput, pTaskInfo); } } @@ -2121,7 +2121,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc taosArrayDestroy(pUpWins); } int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pInfo->binfo.pCtx, - numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); + numOfOutput, pInfo->binfo.rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -2267,7 +2267,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pUpdated); } - finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); + finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowEntryInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); @@ -2398,7 +2398,7 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) { } int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock) { - pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset); + pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowEntryInfoOffset); pBasicInfo->pRes = pResultBlock; for (int32_t i = 0; i < numOfCols; ++i) { pBasicInfo->pCtx[i].pBuf = NULL; @@ -2592,7 +2592,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t } static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pResult, SqlFunctionCtx* pCtx, - uint64_t groupId, int32_t numOfOutput, int32_t* rowCellInfoOffset, + uint64_t groupId, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SStreamAggSupporter* pAggSup, SExecTaskInfo* pTaskInfo) { assert(pWinInfo->win.skey <= pWinInfo->win.ekey); // too many time window in query @@ -2621,7 +2621,7 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes // set time window for current result (*pResult)->win = pWinInfo->win; - setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowCellInfoOffset); + setResultRowInitCtx(*pResult, pCtx, numOfOutput, rowEntryInfoOffset); return TSDB_CODE_SUCCESS; } @@ -2632,7 +2632,7 @@ static int32_t doOneWindowAggImpl(int32_t tsColId, SOptrBasicInfo* pBinfo, SStre SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, tsColId); TSKEY* tsCols = (int64_t*)pColDataInfo->pData; int32_t code = setWindowOutputBuf(pCurWin, pResult, pBinfo->pCtx, pSDataBlock->info.groupId, numOutput, - pBinfo->rowCellInfoOffset, pAggSup, pTaskInfo); + pBinfo->rowEntryInfoOffset, pAggSup, pTaskInfo); if (code != TSDB_CODE_SUCCESS || (*pResult) == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } @@ -2674,7 +2674,7 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SHashObj* pStUpdated, SHashObj* pStDeleted) { SResultWindowInfo* pCurWin = taosArrayGet(pInfo->streamAggSup.pCurWins, startIndex); SResultRow* pCurResult = NULL; - setWindowOutputBuf(pCurWin, &pCurResult, pInfo->binfo.pCtx, groupId, numOfOutput, pInfo->binfo.rowCellInfoOffset, + setWindowOutputBuf(pCurWin, &pCurResult, pInfo->binfo.pCtx, groupId, numOfOutput, pInfo->binfo.rowEntryInfoOffset, &pInfo->streamAggSup, pTaskInfo); num += startIndex + 1; ASSERT(num <= taosArrayGetSize(pInfo->streamAggSup.pCurWins)); @@ -2682,7 +2682,7 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, for (int32_t i = startIndex + 1; i < num; i++) { SResultWindowInfo* pWinInfo = taosArrayGet(pInfo->streamAggSup.pCurWins, i); SResultRow* pWinResult = NULL; - setWindowOutputBuf(pWinInfo, &pWinResult, pInfo->pDummyCtx, groupId, numOfOutput, pInfo->binfo.rowCellInfoOffset, + setWindowOutputBuf(pWinInfo, &pWinResult, pInfo->pDummyCtx, groupId, numOfOutput, pInfo->binfo.rowEntryInfoOffset, &pInfo->streamAggSup, pTaskInfo); pCurWin->win.ekey = TMAX(pCurWin->win.ekey, pWinInfo->win.ekey); compactFunctions(pInfo->binfo.pCtx, pInfo->pDummyCtx, numOfOutput, pTaskInfo); @@ -2815,7 +2815,7 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin for (int32_t i = 0; i < size; i++) { SResultWindowInfo* pParentWin = taosArrayGet(pWinArray, i); SResultRow* pCurResult = NULL; - setWindowOutputBuf(pParentWin, &pCurResult, pInfo->binfo.pCtx, groupId, numOfOutput, pInfo->binfo.rowCellInfoOffset, + setWindowOutputBuf(pParentWin, &pCurResult, pInfo->binfo.pCtx, groupId, numOfOutput, pInfo->binfo.rowEntryInfoOffset, &pInfo->streamAggSup, pTaskInfo); int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren); for (int32_t j = 0; j < numOfChildren; j++) { @@ -2829,7 +2829,7 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin if (pParentWin->win.skey <= pcw->win.skey && pcw->win.ekey <= pParentWin->win.ekey) { SResultRow* pChResult = NULL; setWindowOutputBuf(pcw, &pChResult, pChInfo->binfo.pCtx, groupId, numOfOutput, - pChInfo->binfo.rowCellInfoOffset, &pChInfo->streamAggSup, pTaskInfo); + pChInfo->binfo.rowEntryInfoOffset, &pChInfo->streamAggSup, pTaskInfo); compactFunctions(pInfo->binfo.pCtx, pChInfo->binfo.pCtx, numOfOutput, pTaskInfo); continue; } @@ -2956,7 +2956,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { taosHashCleanup(pStUpdated); finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated, - pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.rowEntryInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); @@ -3310,7 +3310,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { taosHashCleanup(pSeUpdated); finalizeUpdatedResult(pOperator->numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated, - pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.rowEntryInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); doBuildDeleteDataBlock(pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); @@ -3428,7 +3428,7 @@ static int32_t outputMergeIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t ASSERT(p1 != NULL); finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, iaInfo->binfo.pCtx, pOperatorInfo->pExpr, - pOperatorInfo->numOfExprs, iaInfo->binfo.rowCellInfoOffset, pResultBlock, + pOperatorInfo->numOfExprs, iaInfo->binfo.rowEntryInfoOffset, pResultBlock, pTaskInfo); taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); @@ -3456,7 +3456,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* //TODO: remove the hash table usage (groupid + winkey => result row position) int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, iaInfo->binfo.pCtx, - numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup, pTaskInfo); + numOfOutput, iaInfo->binfo.rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -3483,7 +3483,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, iaInfo->interval.precision) - 1; startPos = currPos; ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, iaInfo->binfo.pCtx, - numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup, pTaskInfo); + numOfOutput, iaInfo->binfo.rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); }