From 1444f0b466a24f4135bad4591acd79b11bd8ec96 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 8 Jan 2022 16:28:44 +0800 Subject: [PATCH] [td-11818] refactor --- include/libs/executor/executor.h | 36 +- include/libs/function/function.h | 1 - source/libs/executor/CMakeLists.txt | 2 +- source/libs/executor/inc/executil.h | 18 +- source/libs/executor/inc/executorimpl.h | 89 ++-- source/libs/executor/src/executil.c | 24 +- source/libs/executor/src/executorimpl.c | 543 +++++++++++------------ source/libs/scheduler/inc/schedulerInt.h | 1 - 8 files changed, 341 insertions(+), 373 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index c3c7d740f7..17c11b5d09 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -20,16 +20,16 @@ extern "C" { #endif -typedef void* qinfo_t; +typedef void* qTaskInfo_t; /** * create the qinfo object according to QueryTableMsg * @param tsdb * @param pQueryTableMsg - * @param qinfo + * @param pTaskInfo * @return */ -int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableInfo* pQueryTableMsg, qinfo_t* qinfo, uint64_t qId); +int32_t qCreateTask(void* tsdb, int32_t vgId, void* pQueryTableMsg, qTaskInfo_t* pTaskInfo, uint64_t qId); /** * the main query execution function, including query on both table and multiple tables, @@ -38,7 +38,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableInfo* pQueryTableM * @param qinfo * @return */ -bool qTableQuery(qinfo_t qinfo, uint64_t *qId); +bool qExecTask(qTaskInfo_t qinfo, uint64_t *qId); /** * Retrieve the produced results information, if current query is not paused or completed, @@ -48,7 +48,7 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId); * @param qinfo * @return */ -int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContext); +int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext); /** * @@ -60,41 +60,41 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex * @param contLen payload length * @return */ -int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec); +int32_t qDumpRetrieveResult(qTaskInfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen, bool* continueExec); /** * return the transporter context (RPC) * @param qinfo * @return */ -void* qGetResultRetrieveMsg(qinfo_t qinfo); +void* qGetResultRetrieveMsg(qTaskInfo_t qinfo); /** * kill the ongoing query and free the query handle and corresponding resources automatically * @param qinfo qhandle * @return */ -int32_t qKillQuery(qinfo_t qinfo); +int32_t qKillTask(qTaskInfo_t qinfo); /** * return whether query is completed or not * @param qinfo * @return */ -int32_t qIsQueryCompleted(qinfo_t qinfo); +int32_t qIsQueryCompleted(qTaskInfo_t qinfo); /** * destroy query info structure * @param qHandle */ -void qDestroyQueryInfo(qinfo_t qHandle); +void qDestroyTask(qTaskInfo_t qHandle); /** * Get the queried table uid * @param qHandle * @return */ -int64_t qGetQueriedTableUid(qinfo_t qHandle); +int64_t qGetQueriedTableUid(qTaskInfo_t qHandle); /** * Extract the qualified table id list, and than pass them to the TSDB driver to load the required table data blocks. @@ -121,7 +121,7 @@ int32_t qCreateTableGroupByGroupExpr(SArray* pTableIdList, TSKEY skey, STableGro * @param type operation type: ADD|DROP * @return */ -int32_t qUpdateQueriedTableIdList(qinfo_t qinfo, int64_t uid, int32_t type); +int32_t qUpdateQueriedTableIdList(qTaskInfo_t qinfo, int64_t uid, int32_t type); //================================================================================================ // query handle management @@ -130,13 +130,13 @@ int32_t qUpdateQueriedTableIdList(qinfo_t qinfo, int64_t uid, int32_t type); * @param vgId * @return */ -void* qOpenQueryMgmt(int32_t vgId); +void* qOpenTaskMgmt(int32_t vgId); /** * broadcast the close information and wait for all query stop. * @param pExecutor */ -void qQueryMgmtNotifyClosed(void* pExecutor); +void qTaskMgmtNotifyClosing(void* pExecutor); /** * Re-open the query handle management module when opening the vnode again. @@ -148,7 +148,7 @@ void qQueryMgmtReOpen(void *pExecutor); * Close query mgmt and clean up resources. * @param pExecutor */ -void qCleanupQueryMgmt(void* pExecutor); +void qCleanupTaskMgmt(void* pExecutor); /** * Add the query into the query mgmt object @@ -157,7 +157,7 @@ void qCleanupQueryMgmt(void* pExecutor); * @param qInfo * @return */ -void** qRegisterQInfo(void* pMgmt, uint64_t qId, void *qInfo); +void** qRegisterTask(void* pMgmt, uint64_t qId, void *qInfo); /** * acquire the query handle according to the key from query mgmt object. @@ -165,7 +165,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qId, void *qInfo); * @param key * @return */ -void** qAcquireQInfo(void* pMgmt, uint64_t key); +void** qAcquireTask(void* pMgmt, uint64_t key); /** * release the query handle and decrease the reference count in cache @@ -174,7 +174,7 @@ void** qAcquireQInfo(void* pMgmt, uint64_t key); * @param freeHandle * @return */ -void** qReleaseQInfo(void* pMgmt, void* pQInfo); +void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle); /** * De-register the query handle from the management module and free it immediately. diff --git a/include/libs/function/function.h b/include/libs/function/function.h index d7360a81bc..49a2fd5903 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -183,7 +183,6 @@ typedef struct tExprNode { struct {// function node char functionName[FUNCTIONS_NAME_MAX_LENGTH]; -// int32_t functionId; int32_t num; // Note that the attribute of pChild is not the parameter of function, it is the columns that involved in the diff --git a/source/libs/executor/CMakeLists.txt b/source/libs/executor/CMakeLists.txt index a6f70b9e83..a6a1ac722b 100644 --- a/source/libs/executor/CMakeLists.txt +++ b/source/libs/executor/CMakeLists.txt @@ -8,5 +8,5 @@ target_include_directories( target_link_libraries( executor - PRIVATE os util common function parser + PRIVATE os util common function parser qcom ) \ No newline at end of file diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 7e910d5674..2c1bf71638 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -88,37 +88,37 @@ typedef struct SResultRowPool { SArray* pData; // SArray } SResultRowPool; -struct SQueryAttr; -struct SQueryRuntimeEnv; +struct STaskAttr; +struct STaskRuntimeEnv; struct SUdfInfo; -int32_t getOutputInterResultBufSize(struct SQueryAttr* pQueryAttr); +int32_t getOutputInterResultBufSize(struct STaskAttr* pQueryAttr); -size_t getResultRowSize(struct SQueryRuntimeEnv* pRuntimeEnv); +size_t getResultRowSize(struct STaskRuntimeEnv* pRuntimeEnv); int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size, int16_t type); void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo); -void resetResultRowInfo(struct SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo); +void resetResultRowInfo(struct STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo); int32_t numOfClosedResultRows(SResultRowInfo* pResultRowInfo); void closeAllResultRows(SResultRowInfo* pResultRowInfo); int32_t initResultRow(SResultRow *pResultRow); void closeResultRow(SResultRowInfo* pResultRowInfo, int32_t slot); bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot); -void clearResultRow(struct SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type); +void clearResultRow(struct STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type); struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset); void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr); void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols); -int32_t getRowNumForMultioutput(struct SQueryAttr* pQueryAttr, bool topBottomQuery, bool stable); +int32_t getRowNumForMultioutput(struct STaskAttr* pQueryAttr, bool topBottomQuery, bool stable); static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) { assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size); return pResultRowInfo->pResult[slot]; } -static FORCE_INLINE char* getPosInResultPage(struct SQueryAttr* pQueryAttr, SFilePage* page, int32_t rowOffset, +static FORCE_INLINE char* getPosInResultPage(struct STaskAttr* pQueryAttr, SFilePage* page, int32_t rowOffset, int32_t offset) { assert(rowOffset >= 0 && pQueryAttr != NULL); @@ -155,7 +155,7 @@ bool hasRemainData(SGroupResInfo* pGroupResInfo); bool incNextGroup(SGroupResInfo* pGroupResInfo); int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); -int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, struct SQueryRuntimeEnv *pRuntimeEnv, int32_t* offset); +int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, struct STaskRuntimeEnv *pRuntimeEnv, int32_t* offset); int32_t initUdfInfo(struct SUdfInfo* pUdfInfo); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 907fb4d2bf..32bcb58bc0 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -21,7 +21,6 @@ #include "tvariant.h" #include "thash.h" -//#include "parser.h" #include "executil.h" #include "taosdef.h" #include "tarray.h" @@ -166,7 +165,7 @@ typedef struct { // The basic query information extracted from the SQueryInfo tree to support the // execution of query in a data node. -typedef struct SQueryAttr { +typedef struct STaskAttr { SLimit limit; SLimit slimit; @@ -229,16 +228,16 @@ typedef struct SQueryAttr { STableGroupInfo tableGroupInfo; // table list SArray int32_t vgId; SArray *pUdfInfo; // no need to free -} SQueryAttr; +} STaskAttr; typedef SSDataBlock* (*__operator_fn_t)(void* param, bool* newgroup); typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num); struct SOperatorInfo; -typedef struct SQueryRuntimeEnv { +typedef struct STaskRuntimeEnv { jmp_buf env; - SQueryAttr* pQueryAttr; + STaskAttr* pQueryAttr; uint32_t status; // query status void* qinfo; uint8_t scanFlag; // denotes reversed scan of data or not @@ -271,7 +270,7 @@ typedef struct SQueryRuntimeEnv { SRspResultInfo resultInfo; SHashObj *pTableRetrieveTsMap; struct SUdfInfo *pUdfInfo; -} SQueryRuntimeEnv; +} STaskRuntimeEnv; enum { OP_IN_EXECUTING = 1, @@ -287,9 +286,9 @@ typedef struct SOperatorInfo { char *name; // name, used to show the query execution plan void *info; // extension attribution SExprInfo *pExpr; - SQueryRuntimeEnv *pRuntimeEnv; + STaskRuntimeEnv *pRuntimeEnv; - struct SOperatorInfo **upstream; // upstream pointer list + struct SOperatorInfo ** pDownstream; // upstream pointer list int32_t numOfUpstream; // number of upstream. The value is always ONE expect for join operator __operator_fn_t exec; __optr_cleanup_fn_t cleanup; @@ -312,8 +311,8 @@ typedef struct SQInfo { int32_t code; // error code to returned to client int64_t owner; // if it is in execution - SQueryRuntimeEnv runtimeEnv; - SQueryAttr query; + STaskRuntimeEnv runtimeEnv; + STaskAttr query; void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables; pthread_mutex_t lock; // used to synchronize the rsp/query threads @@ -325,7 +324,7 @@ typedef struct SQInfo { SQueryCostInfo summary; } SQInfo; -typedef struct SQueryParam { +typedef struct STaskParam { char *sql; char *tagCond; char *colCond; @@ -345,7 +344,7 @@ typedef struct SQueryParam { int32_t tableScanOperator; SArray *pOperator; struct SUdfInfo *pUdfInfo; -} SQueryParam; +} STaskParam; typedef struct STableScanInfo { void *pQueryHandle; @@ -512,34 +511,34 @@ typedef struct SOrderOperatorInfo { void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream); -SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); -SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime); -SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); - -SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); -SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult); -SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); -SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, +SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); +SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv, int32_t repeatTime); +SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv); + +SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); +SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult); +SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv); +SOperatorInfo* createMultiwaySortOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows, void* merger); -SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp); -SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger, bool multigroupResult); -SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, +SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp); +SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger, bool multigroupResult); +SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter); SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput); -SOperatorInfo* createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal); +SOperatorInfo* createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal); SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); @@ -561,8 +560,8 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity); void copyTsColoum(SSDataBlock* pRes, SQLFunctionCtx* pCtx, int32_t numOfOutput); -void freeParam(SQueryParam *param); -int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param); +void freeParam(STaskParam *param); +int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, STaskParam* param); int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo, SSqlExpr** pExprMsg, SColumnInfo* pTagCols, int32_t queryType, void* pMsg, struct SUdfInfo* pUdfInfo); @@ -575,13 +574,13 @@ SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableMsg *pQueryMsg, SColIndex *pCo SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SGroupbyExpr *pGroupbyExpr, SExprInfo *pExprs, SExprInfo *pSecExprs, STableGroupInfo *pTableGroupInfo, SColumnInfo* pTagCols, SFilterInfo* pFilters, int32_t vgId, char* sql, uint64_t qId, struct SUdfInfo* pUdfInfo); -int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, SQueryParam* param, char* start, +int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start, int32_t prevResultLen, void* merger); -int32_t createFilterInfo(SQueryAttr* pQueryAttr, uint64_t qId); +int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId); void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); -STableQueryInfo *createTableQueryInfo(SQueryAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win, void* buf); +STableQueryInfo *createTableQueryInfo(STaskAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win, void* buf); STableQueryInfo* createTmpTableQueryInfo(STimeWindow win); int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg); @@ -590,9 +589,9 @@ bool isQueryKilled(SQInfo *pQInfo); int32_t checkForQueryBuf(size_t numOfTables); bool checkNeedToCompressQueryCol(SQInfo *pQInfo); bool doBuildResCheck(SQInfo* pQInfo); -void setQueryStatus(SQueryRuntimeEnv *pRuntimeEnv, int8_t status); +void setQueryStatus(STaskRuntimeEnv *pRuntimeEnv, int8_t status); -bool onlyQueryTags(SQueryAttr* pQueryAttr); +bool onlyQueryTags(STaskAttr* pQueryAttr); void destroyUdfInfo(struct SUdfInfo* pUdfInfo); bool isValidQInfo(void *param); @@ -607,8 +606,8 @@ void publishQueryAbortEvent(SQInfo* pQInfo, int32_t code); void calculateOperatorProfResults(SQInfo* pQInfo); void queryCostStatis(SQInfo *pQInfo); -void freeQInfo(SQInfo *pQInfo); -void freeQueryAttr(SQueryAttr *pQuery); +void doDestroyTask(SQInfo *pQInfo); +void freeQueryAttr(STaskAttr *pQuery); int32_t getMaximumIdleDurationSec(); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index ac91f906c7..e8ecffb72c 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -30,7 +30,7 @@ typedef struct SCompSupporter { int32_t order; } SCompSupporter; -int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, bool stable) { +int32_t getRowNumForMultioutput(STaskAttr* pQueryAttr, bool topBottomQuery, bool stable) { if (pQueryAttr && (!stable)) { for (int16_t i = 0; i < pQueryAttr->numOfOutput; ++i) { // if (pQueryAttr->pExpr1[i].base. == FUNCTION_TOP || pQueryAttr->pExpr1[i].base.functionId == FUNCTION_BOTTOM) { @@ -42,7 +42,7 @@ int32_t getRowNumForMultioutput(SQueryAttr* pQueryAttr, bool topBottomQuery, boo return 1; } -int32_t getOutputInterResultBufSize(SQueryAttr* pQueryAttr) { +int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) { int32_t size = 0; for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { @@ -86,7 +86,7 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) { tfree(pResultRowInfo->pResult); } -void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) { +void resetResultRowInfo(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) { if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0) { return; } @@ -136,7 +136,7 @@ void closeResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) { getResultRow(pResultRowInfo, slot)->closed = true; } -void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16_t type) { +void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16_t type) { if (pResultRow == NULL) { return; } @@ -174,8 +174,8 @@ struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, return NULL; } -size_t getResultRowSize(SQueryRuntimeEnv* pRuntimeEnv) { - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; +size_t getResultRowSize(STaskRuntimeEnv* pRuntimeEnv) { + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; return 0; // return (pQueryAttr->numOfOutput * sizeof(SResultRowEntryInfo)) + pQueryAttr->interBufSize + sizeof(SResultRow); } @@ -393,8 +393,8 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) { return (int32_t) taosArrayGetSize(pGroupResInfo->pRows); } -static int64_t getNumOfResultWindowRes(SQueryRuntimeEnv* pRuntimeEnv, SResultRow *pResultRow, int32_t* rowCellInfoOffset) { - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; +static int64_t getNumOfResultWindowRes(STaskRuntimeEnv* pRuntimeEnv, SResultRow *pResultRow, int32_t* rowCellInfoOffset) { + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; for (int32_t j = 0; j < pQueryAttr->numOfOutput; ++j) { int32_t functionId = 0;//pQueryAttr->pExpr1[j].base.functionId; @@ -488,7 +488,7 @@ int32_t tsDescOrder(const void* p1, const void* p2) { } } -void orderTheResultRows(SQueryRuntimeEnv* pRuntimeEnv) { +void orderTheResultRows(STaskRuntimeEnv* pRuntimeEnv) { __compar_fn_t fn = NULL; if (pRuntimeEnv->pQueryAttr->order.order == TSDB_ORDER_ASC) { fn = tsAscOrder; @@ -499,7 +499,7 @@ void orderTheResultRows(SQueryRuntimeEnv* pRuntimeEnv) { taosArraySort(pRuntimeEnv->pResultRowArrayList, fn); } -static int32_t mergeIntoGroupResultImplRv(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, uint64_t groupId, int32_t* rowCellInfoOffset) { +static int32_t mergeIntoGroupResultImplRv(STaskRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, uint64_t groupId, int32_t* rowCellInfoOffset) { if (!pGroupResInfo->ordered) { orderTheResultRows(pRuntimeEnv); pGroupResInfo->ordered = true; @@ -528,7 +528,7 @@ static int32_t mergeIntoGroupResultImplRv(SQueryRuntimeEnv *pRuntimeEnv, SGroupR return TSDB_CODE_SUCCESS; } -static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList, +static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(STaskRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, SArray *pTableList, int32_t* rowCellInfoOffset) { bool ascQuery = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQueryAttr); @@ -630,7 +630,7 @@ static UNUSED_FUNC int32_t mergeIntoGroupResultImpl(SQueryRuntimeEnv *pRuntimeEn return code; } -int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, SQueryRuntimeEnv* pRuntimeEnv, int32_t* offset) { +int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, STaskRuntimeEnv* pRuntimeEnv, int32_t* offset) { int64_t st = taosGetTimestampUs(); while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f119627c69..a22f56753b 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -12,13 +12,13 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include #include "os.h" #include "tmsg.h" #include "tglobal.h" #include "ttime.h" - #include "exception.h" + +#include "function.h" #include "executorimpl.h" #include "thash.h" #include "function.h" @@ -41,11 +41,6 @@ #define MULTI_KEY_DELIM "-" -#define TIME_WINDOW_COPY(_dst, _src) do {\ - (_dst).skey = (_src).skey;\ - (_dst).ekey = (_src).ekey;\ -} while (0) - enum { TS_JOIN_TS_EQUAL = 0, TS_JOIN_TS_NOT_EQUALS = 1, @@ -131,40 +126,16 @@ do { \ } \ } while (0) -uint64_t queryHandleId = 0; - int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; } -int64_t genQueryId(void) { - int64_t uid = 0; - int64_t did = 0;//tsDnodeId; - - uid = did << 54; - - int64_t pid = ((int64_t)taosGetPId()) & 0x3FF; - - uid |= pid << 44; - - int64_t ts = taosGetTimestampMs() & 0x1FFFFFFFF; - - uid |= ts << 11; - - int64_t sid = atomic_add_fetch_64(&queryHandleId, 1) & 0x7FF; - - uid |= sid; - -// //qDebug("gen qid:0x%"PRIx64, uid); - - return uid; -} static int32_t getExprFunctionId(SExprInfo *pExprInfo) { assert(pExprInfo != NULL && pExprInfo->pExpr != NULL && pExprInfo->pExpr->nodeType == TEXPR_UNARYEXPR_NODE); return 0; } -static void getNextTimeWindow(SQueryAttr* pQueryAttr, STimeWindow* tw) { +static void getNextTimeWindow(STaskAttr* pQueryAttr, STimeWindow* tw) { int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); if (pQueryAttr->interval.intervalUnit != 'n' && pQueryAttr->interval.intervalUnit != 'y') { tw->skey += pQueryAttr->interval.sliding * factor; @@ -198,28 +169,28 @@ static void getNextTimeWindow(SQueryAttr* pQueryAttr, STimeWindow* tw) { } static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes); -static void setResultOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SQLFunctionCtx* pCtx, +static void setResultOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SQLFunctionCtx* pCtx, int32_t numOfCols, int32_t* rowCellInfoOffset); -void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); -static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx); +void setResultRowOutputBufInitCtx(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); +static bool functionNeedToExecute(STaskRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx); static void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColIndex* pColIndex); static void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo); -static bool hasMainOutput(SQueryAttr *pQueryAttr); +static bool hasMainOutput(STaskAttr *pQueryAttr); static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols); -static int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, SVariant* pTag, STableQueryInfo *pTableQueryInfo); +static int32_t setTimestampListJoinInfo(STaskRuntimeEnv* pRuntimeEnv, SVariant* pTag, STableQueryInfo *pTableQueryInfo); static void releaseQueryBuf(size_t numOfTables); static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order); -//static STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win); +//static STsdbQueryCond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win); static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo); static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream); -static int32_t getNumOfScanTimes(SQueryAttr* pQueryAttr); +static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr); static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); static void destroySFillOperatorInfo(void* param, int32_t numOfOutput); @@ -239,25 +210,25 @@ static void doSetOperatorCompleted(SOperatorInfo* pOperator) { } } -static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock); +static int32_t doCopyToSDataBlock(STaskRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock); static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock); -static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binf, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex); +static int32_t setGroupResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binf, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex); static void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size); -static void getAlignQueryTimeWindow(SQueryAttr *pQueryAttr, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); -static void setResultBufSize(SQueryAttr* pQueryAttr, SRspResultInfo* pResultInfo); -static void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable); -static void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr); -static void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes); -static void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, +static void getAlignQueryTimeWindow(STaskAttr *pQueryAttr, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win); +static void setResultBufSize(STaskAttr* pQueryAttr, SRspResultInfo* pResultInfo); +static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable); +static void setParamForStableStddev(STaskRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr); +static void setParamForStableStddevByColData(STaskRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes); +static void doSetTableGroupOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput, int32_t tableGroupId); -SArray* getOrderCheckColumns(SQueryAttr* pQuery); +SArray* getOrderCheckColumns(STaskAttr* pQuery); typedef struct SRowCompSupporter { - SQueryRuntimeEnv *pRuntimeEnv; + STaskRuntimeEnv *pRuntimeEnv; int16_t dataOffset; __compar_fn_t comFunc; } SRowCompSupporter; @@ -267,7 +238,7 @@ static int compareRowData(const void *a, const void *b, const void *userData) { const SResultRow *pRow2 = (const SResultRow *)b; SRowCompSupporter *supporter = (SRowCompSupporter *)userData; - SQueryRuntimeEnv* pRuntimeEnv = supporter->pRuntimeEnv; + STaskRuntimeEnv* pRuntimeEnv = supporter->pRuntimeEnv; SFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pRow1->pageId); SFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pRow2->pageId); @@ -279,7 +250,7 @@ static int compareRowData(const void *a, const void *b, const void *userData) { return (in1 != NULL && in2 != NULL) ? supporter->comFunc(in1, in2) : 0; } -static void sortGroupResByOrderList(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv *pRuntimeEnv, SSDataBlock* pDataBlock) { +static void sortGroupResByOrderList(SGroupResInfo *pGroupResInfo, STaskRuntimeEnv *pRuntimeEnv, SSDataBlock* pDataBlock) { SArray *columnOrderList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); size_t size = taosArrayGetSize(columnOrderList); taosArrayDestroy(columnOrderList); @@ -375,7 +346,7 @@ static bool isSelectivityWithTagsQuery(SQLFunctionCtx *pCtx, int32_t numOfOutput // return (numOfSelectivity > 0 && hasTags); } -static bool isProjQuery(SQueryAttr *pQueryAttr) { +static bool isProjQuery(STaskAttr *pQueryAttr) { for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { int32_t functId = getExprFunctionId(&pQueryAttr->pExpr1[i]); if (functId != FUNCTION_PRJ && functId != FUNCTION_TAGPRJ) { @@ -398,7 +369,7 @@ static bool hasNull(SColIndex* pColIndex, SColumnDataAgg *pStatis) { return true; } -static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, SQueryRuntimeEnv* pRuntimeEnv) { +static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, STaskRuntimeEnv* pRuntimeEnv) { // more than the capacity, reallocate the resources if (pResultRowInfo->size < pResultRowInfo->capacity) { return; @@ -424,7 +395,7 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, SQueryRuntim pResultRowInfo->capacity = (int32_t)newCapacity; } -static bool chkResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, char *pData, +static bool chkResultRowFromKey(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, char *pData, int16_t bytes, bool masterscan, uint64_t uid) { bool existed = false; SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid); @@ -462,7 +433,7 @@ static bool chkResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *p } -static SResultRow* doSetResultOutBufByKey(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, int64_t tid, +static SResultRow* doSetResultOutBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, int64_t tid, char* pData, int16_t bytes, bool masterscan, uint64_t tableGroupId) { bool existed = false; SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tableGroupId); @@ -536,7 +507,7 @@ static SResultRow* doSetResultOutBufByKey(SQueryRuntimeEnv* pRuntimeEnv, SResult return pResultRowInfo->pResult[pResultRowInfo->curPos]; } -static void getInitialStartTimeWindow(SQueryAttr* pQueryAttr, TSKEY ts, STimeWindow* w) { +static void getInitialStartTimeWindow(STaskAttr* pQueryAttr, TSKEY ts, STimeWindow* w) { if (QUERY_IS_ASC_QUERY(pQueryAttr)) { getAlignQueryTimeWindow(pQueryAttr, ts, ts, pQueryAttr->window.ekey, w); } else { @@ -561,7 +532,7 @@ static void getInitialStartTimeWindow(SQueryAttr* pQueryAttr, TSKEY ts, STimeWin } // get the correct time window according to the handled timestamp -static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, SQueryAttr *pQueryAttr) { +static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, STaskAttr *pQueryAttr) { STimeWindow w = {0}; if (pResultRowInfo->curPos == -1) { // the first window, from the previous stored value @@ -609,7 +580,7 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t } // get the correct time window according to the handled timestamp -static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, SQueryAttr *pQueryAttr) { +static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, STaskAttr *pQueryAttr) { STimeWindow w = {0}; if (pResultRowInfo->curPos == -1) { // the first window, from the previous stored value @@ -680,14 +651,14 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf return 0; } -static bool chkWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, STimeWindow *win, +static bool chkWindowOutputBufByKey(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, STimeWindow *win, bool masterscan, SResultRow **pResult, int64_t groupId, SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) { assert(win->skey <= win->ekey); return chkResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, groupId); } -static int32_t setResultOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int64_t tid, STimeWindow *win, +static int32_t setResultOutputBufByKey(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int64_t tid, STimeWindow *win, bool masterscan, SResultRow **pResult, int64_t tableGroupId, SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) { assert(win->skey <= win->ekey); @@ -816,7 +787,7 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey, } } -static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, SQueryAttr* pQueryAttr, TSKEY lastKey) { +static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, STaskAttr* pQueryAttr, TSKEY lastKey) { bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); if ((lastKey > pQueryAttr->window.ekey && ascQuery) || (lastKey < pQueryAttr->window.ekey && (!ascQuery))) { closeAllResultRows(pResultRowInfo); @@ -827,10 +798,10 @@ static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, SQuer } } -static int32_t getNumOfRowsInTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo *pDataBlockInfo, TSKEY *pPrimaryColumn, +static int32_t getNumOfRowsInTimeWindow(STaskRuntimeEnv* pRuntimeEnv, SDataBlockInfo *pDataBlockInfo, TSKEY *pPrimaryColumn, int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, bool updateLastKey) { assert(startPos >= 0 && startPos < pDataBlockInfo->rows); - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; STableQueryInfo* item = pRuntimeEnv->current; int32_t num = -1; @@ -867,9 +838,9 @@ static int32_t getNumOfRowsInTimeWindow(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc return num; } -static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset, +static void doApplyFunctions(STaskRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput) { - SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; bool hasAggregates = pCtx[0].isAggSet; for (int32_t k = 0; k < numOfOutput; ++k) { @@ -904,7 +875,7 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx } } -static int32_t getNextQualifiedWindow(SQueryAttr* pQueryAttr, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo, +static int32_t getNextQualifiedWindow(STaskAttr* pQueryAttr, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo, TSKEY* primaryKeys, __block_search_fn_t searchFn, int32_t prevPosition) { getNextTimeWindow(pQueryAttr, pNext); @@ -983,7 +954,7 @@ static int32_t getNextQualifiedWindow(SQueryAttr* pQueryAttr, STimeWindow* pNext return startPos; } -static FORCE_INLINE TSKEY reviseWindowEkey(SQueryAttr *pQueryAttr, STimeWindow *pWindow) { +static FORCE_INLINE TSKEY reviseWindowEkey(STaskAttr *pQueryAttr, STimeWindow *pWindow) { TSKEY ekey = -1; if (QUERY_IS_ASC_QUERY(pQueryAttr)) { ekey = pWindow->ekey; @@ -1012,20 +983,20 @@ static void setNotInterpoWindowKey(SQLFunctionCtx* pCtx, int32_t numOfOutput, in } } -static void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pDataBlockInfo, SArray* pDataBlock, +static void saveDataBlockLastRow(STaskRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pDataBlockInfo, SArray* pDataBlock, int32_t rowIndex) { if (pDataBlock == NULL) { return; } - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; for (int32_t k = 0; k < pQueryAttr->numOfCols; ++k) { SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k); memcpy(pRuntimeEnv->prevRow[k], ((char*)pColInfo->pData) + (pColInfo->info.bytes * rowIndex), pColInfo->info.bytes); } } -static TSKEY getStartTsKey(SQueryAttr* pQueryAttr, STimeWindow* win, const TSKEY* tsCols, int32_t rows) { +static TSKEY getStartTsKey(STaskAttr* pQueryAttr, STimeWindow* win, const TSKEY* tsCols, int32_t rows) { TSKEY ts = TSKEY_INITIAL_VAL; bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); @@ -1126,7 +1097,7 @@ static void doSetInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, } static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunctionCtx* pCtx, SSDataBlock* pSDataBlock) { - SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { if (functionNeedToExecute(pRuntimeEnv, &pCtx[k])) { @@ -1136,8 +1107,8 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction } } -static void projectApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t numOfOutput) { - SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; +static void projectApplyFunctions(STaskRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t numOfOutput) { + STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; for (int32_t k = 0; k < numOfOutput; ++k) { pCtx[k].startTs = pQueryAttr->window.skey; @@ -1161,7 +1132,7 @@ static void projectApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type) { - SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; + STaskRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; SExprInfo* pExpr = pOperator->pExpr; SQLFunctionCtx* pCtx = pInfo->pCtx; @@ -1226,8 +1197,8 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, static bool setTimeWindowInterpolationStartTs(SOperatorInfo* pOperatorInfo, SQLFunctionCtx* pCtx, int32_t pos, int32_t numOfRows, SArray* pDataBlock, const TSKEY* tsCols, STimeWindow* win) { - SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); @@ -1257,8 +1228,8 @@ static bool setTimeWindowInterpolationStartTs(SOperatorInfo* pOperatorInfo, SQLF static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SQLFunctionCtx* pCtx, int32_t endRowIndex, SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey, STimeWindow* win) { - SQueryRuntimeEnv *pRuntimeEnv = pOperatorInfo->pRuntimeEnv; - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskRuntimeEnv *pRuntimeEnv = pOperatorInfo->pRuntimeEnv; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t numOfOutput = pOperatorInfo->numOfOutput; TSKEY actualEndKey = tsCols[endRowIndex]; @@ -1289,8 +1260,8 @@ static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SQLFun static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBlock* pBlock, SQLFunctionCtx* pCtx, SResultRow* pResult, STimeWindow* win, int32_t startPos, int32_t forwardStep) { - SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; if (!pQueryAttr->timeWindowInterpo) { return; } @@ -1340,9 +1311,9 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) { STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info; - SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; + STaskRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; int32_t numOfOutput = pOperatorInfo->numOfOutput; - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); @@ -1450,9 +1421,9 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) { STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info; - SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; + STaskRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; int32_t numOfOutput = pOperatorInfo->numOfOutput; - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); @@ -1525,12 +1496,12 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { - SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; STableQueryInfo* item = pRuntimeEnv->current; SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex); - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int16_t bytes = pColInfoData->info.bytes; int16_t type = pColInfoData->info.type; @@ -1607,7 +1578,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn } static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { - SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; STableQueryInfo* item = pRuntimeEnv->current; // primary timestamp column @@ -1692,7 +1663,7 @@ static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { } } -static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) { +static int32_t setGroupResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) { SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; int32_t *rowCellInfoOffset = binfo->rowCellInfoOffset; @@ -1746,9 +1717,9 @@ static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pD return -1; } -static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx) { +static bool functionNeedToExecute(STaskRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx) { struct SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; // in case of timestamp column, always generated results. int32_t functionId = pCtx->functionId; @@ -1843,9 +1814,9 @@ static int32_t setCtxTagColumnInfo(SQLFunctionCtx *pCtx, int32_t numOfOutput) { return TSDB_CODE_SUCCESS; } -static SQLFunctionCtx* createSQLFunctionCtx(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, +static SQLFunctionCtx* createSQLFunctionCtx(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, int32_t** rowCellInfoOffset) { - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SQLFunctionCtx * pFuncCtx = (SQLFunctionCtx *)calloc(numOfOutput, sizeof(SQLFunctionCtx)); if (pFuncCtx == NULL) { @@ -1965,9 +1936,9 @@ static void* destroySQLFunctionCtx(SQLFunctionCtx* pCtx, int32_t numOfOutput) { return NULL; } -static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOfTables, SArray* pOperator, void* merger) { +static int32_t setupQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv, int32_t numOfTables, SArray* pOperator, void* merger) { //qDebug("QInfo:0x%"PRIx64" setup runtime env", GET_QID(pRuntimeEnv)); - SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; pRuntimeEnv->prevGroupId = INT32_MIN; pRuntimeEnv->pQueryAttr = pQueryAttr; @@ -2187,8 +2158,8 @@ _clean: return TSDB_CODE_QRY_OUT_OF_MEMORY; } -static void doFreeQueryHandle(SQueryRuntimeEnv* pRuntimeEnv) { - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; +static void doFreeQueryHandle(STaskRuntimeEnv* pRuntimeEnv) { + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; // tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle); pRuntimeEnv->pQueryHandle = NULL; @@ -2197,7 +2168,7 @@ static void doFreeQueryHandle(SQueryRuntimeEnv* pRuntimeEnv) { // assert(pMemRef->ref == 0 && pMemRef->snapshot.imem == NULL && pMemRef->snapshot.mem == NULL); } -static void destroyTsComp(SQueryRuntimeEnv *pRuntimeEnv, SQueryAttr *pQueryAttr) { +static void destroyTsComp(STaskRuntimeEnv *pRuntimeEnv, STaskAttr *pQueryAttr) { if (pQueryAttr->tsCompQuery && pRuntimeEnv->outputBuf && pRuntimeEnv->outputBuf->pDataBlock && taosArrayGetSize(pRuntimeEnv->outputBuf->pDataBlock) > 0) { SColumnInfoData* pColInfoData = taosArrayGet(pRuntimeEnv->outputBuf->pDataBlock, 0); if (pColInfoData) { @@ -2210,8 +2181,8 @@ static void destroyTsComp(SQueryRuntimeEnv *pRuntimeEnv, SQueryAttr *pQueryAttr) } } -static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { - SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; +static void teardownQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv) { + STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; SQInfo* pQInfo = (SQInfo*) pRuntimeEnv->qinfo; //qDebug("QInfo:0x%"PRIx64" teardown runtime env", pQInfo->qId); @@ -2271,7 +2242,7 @@ bool isQueryKilled(SQInfo *pQInfo) { void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED;} -//static bool isFixedOutputQuery(SQueryAttr* pQueryAttr) { +//static bool isFixedOutputQuery(STaskAttr* pQueryAttr) { // if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) { // return false; // } @@ -2297,7 +2268,7 @@ void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_TSC_QUERY_CANCELL //} // todo refactor with isLastRowQuery -//bool isPointInterpoQuery(SQueryAttr *pQueryAttr) { +//bool isPointInterpoQuery(STaskAttr *pQueryAttr) { // for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { // int32_t functionId = pQueryAttr->pExpr1[i].base.functionId; // if (functionId == FUNCTION_INTERP) { @@ -2308,7 +2279,7 @@ void setQueryKilled(SQInfo *pQInfo) { pQInfo->code = TSDB_CODE_TSC_QUERY_CANCELL // return false; //} -static bool isFirstLastRowQuery(SQueryAttr *pQueryAttr) { +static bool isFirstLastRowQuery(STaskAttr *pQueryAttr) { for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { int32_t functionID = getExprFunctionId(&pQueryAttr->pExpr1[i]); if (functionID == FUNCTION_LAST_ROW) { @@ -2319,7 +2290,7 @@ static bool isFirstLastRowQuery(SQueryAttr *pQueryAttr) { return false; } -static bool isCachedLastQuery(SQueryAttr *pQueryAttr) { +static bool isCachedLastQuery(STaskAttr *pQueryAttr) { for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]); if (functionId == FUNCTION_LAST || functionId == FUNCTION_LAST_DST) { @@ -2354,7 +2325,7 @@ static bool isCachedLastQuery(SQueryAttr *pQueryAttr) { * The following 4 kinds of query are treated as the tags query * tagprj, tid_tag query, count(tbname), 'abc' (user defined constant value column) query */ -bool onlyQueryTags(SQueryAttr* pQueryAttr) { +bool onlyQueryTags(STaskAttr* pQueryAttr) { for(int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { SExprInfo* pExprInfo = &pQueryAttr->pExpr1[i]; @@ -2373,7 +2344,7 @@ bool onlyQueryTags(SQueryAttr* pQueryAttr) { ///////////////////////////////////////////////////////////////////////////////////////////// -void getAlignQueryTimeWindow(SQueryAttr *pQueryAttr, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win) { +void getAlignQueryTimeWindow(STaskAttr *pQueryAttr, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win) { assert(key >= keyFirst && key <= keyLast && pQueryAttr->interval.sliding <= pQueryAttr->interval.interval); win->skey = taosTimeTruncate(key, &pQueryAttr->interval, pQueryAttr->precision); @@ -2394,7 +2365,7 @@ void getAlignQueryTimeWindow(SQueryAttr *pQueryAttr, int64_t key, int64_t keyFir /* * todo add more parameters to check soon.. */ -bool colIdCheck(SQueryAttr *pQueryAttr, uint64_t qId) { +bool colIdCheck(STaskAttr *pQueryAttr, uint64_t qId) { // load data column information is incorrect for (int32_t i = 0; i < pQueryAttr->numOfCols - 1; ++i) { if (pQueryAttr->tableCols[i].colId == pQueryAttr->tableCols[i + 1].colId) { @@ -2408,7 +2379,7 @@ bool colIdCheck(SQueryAttr *pQueryAttr, uint64_t qId) { // todo ignore the avg/sum/min/max/count/stddev/top/bottom functions, of which // the scan order is not matter -static bool onlyOneQueryType(SQueryAttr *pQueryAttr, int32_t functId, int32_t functIdDst) { +static bool onlyOneQueryType(STaskAttr *pQueryAttr, int32_t functId, int32_t functIdDst) { for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]); @@ -2425,13 +2396,13 @@ static bool onlyOneQueryType(SQueryAttr *pQueryAttr, int32_t functId, int32_t fu return true; } -static bool onlyFirstQuery(SQueryAttr *pQueryAttr) { return onlyOneQueryType(pQueryAttr, FUNCTION_FIRST, FUNCTION_FIRST_DST); } +static bool onlyFirstQuery(STaskAttr *pQueryAttr) { return onlyOneQueryType(pQueryAttr, FUNCTION_FIRST, FUNCTION_FIRST_DST); } -static bool onlyLastQuery(SQueryAttr *pQueryAttr) { return onlyOneQueryType(pQueryAttr, FUNCTION_LAST, FUNCTION_LAST_DST); } +static bool onlyLastQuery(STaskAttr *pQueryAttr) { return onlyOneQueryType(pQueryAttr, FUNCTION_LAST, FUNCTION_LAST_DST); } -static bool notContainSessionOrStateWindow(SQueryAttr *pQueryAttr) { return !(pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow); } +static bool notContainSessionOrStateWindow(STaskAttr *pQueryAttr) { return !(pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow); } -static int32_t updateBlockLoadStatus(SQueryAttr *pQuery, int32_t status) { +static int32_t updateBlockLoadStatus(STaskAttr *pQuery, int32_t status) { bool hasFirstLastFunc = false; bool hasOtherFunc = false; @@ -2465,7 +2436,7 @@ static int32_t updateBlockLoadStatus(SQueryAttr *pQuery, int32_t status) { return status; } -static void doUpdateLastKey(SQueryAttr* pQueryAttr) { +static void doUpdateLastKey(STaskAttr* pQueryAttr) { STimeWindow* win = &pQueryAttr->window; size_t num = taosArrayGetSize(pQueryAttr->tableGroupInfo.pGroupList); @@ -2485,7 +2456,7 @@ static void doUpdateLastKey(SQueryAttr* pQueryAttr) { } static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bool stableQuery) { - SQueryAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; + STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; // in case of point-interpolation query, use asc order scan char msg[] = "QInfo:0x%"PRIx64" scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%" PRId64 @@ -2580,8 +2551,8 @@ static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bool } } -static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, int32_t* rowsize) { - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; +static void getIntermediateBufInfo(STaskRuntimeEnv* pRuntimeEnv, int32_t* ps, int32_t* rowsize) { + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t MIN_ROWS_PER_PAGE = 4; *rowsize = (int32_t)(pQueryAttr->resultRowSize * getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); @@ -2596,8 +2567,8 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i #define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR) -//static FORCE_INLINE bool doFilterByBlockStatistics(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, SQLFunctionCtx *pCtx, int32_t numOfRows) { -// SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; +//static FORCE_INLINE bool doFilterByBlockStatistics(STaskRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, SQLFunctionCtx *pCtx, int32_t numOfRows) { +// STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; // // if (pDataStatis == NULL || pQueryAttr->pFilters == NULL) { // return true; @@ -2606,7 +2577,7 @@ static void getIntermediateBufInfo(SQueryRuntimeEnv* pRuntimeEnv, int32_t* ps, i // return filterRangeExecute(pQueryAttr->pFilters, pDataStatis, pQueryAttr->numOfCols, numOfRows); //} -static bool overlapWithTimeWindow(SQueryAttr* pQueryAttr, SDataBlockInfo* pBlockInfo) { +static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockInfo) { STimeWindow w = {0}; TSKEY sk = MIN(pQueryAttr->window.skey, pQueryAttr->window.ekey); @@ -2655,7 +2626,7 @@ static bool overlapWithTimeWindow(SQueryAttr* pQueryAttr, SDataBlockInfo* pBlock return false; } -static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key, bool ascQuery) { +static int32_t doTSJoinFilter(STaskRuntimeEnv *pRuntimeEnv, TSKEY key, bool ascQuery) { STSElem elem = tsBufGetElem(pRuntimeEnv->pTsBuf); #if defined(_DEBUG_VIEW) @@ -2781,7 +2752,7 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p) { } } -void filterRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, +void filterRowsInDataBlock(STaskRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock, bool ascQuery) { int32_t numOfRows = pBlock->info.rows; @@ -2823,7 +2794,7 @@ void filterRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSingleColumnFilterInf tfree(p); } -void filterColRowsInDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock, bool ascQuery) { +void filterColRowsInDataBlock(STaskRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock, bool ascQuery) { int32_t numOfRows = pBlock->info.rows; int8_t *p = NULL; @@ -2913,13 +2884,13 @@ void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFi } } -int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, +int32_t loadDataBlockOnDemand(STaskRuntimeEnv* pRuntimeEnv, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { *status = BLK_DATA_NO_NEEDED; pBlock->pDataBlock = NULL; pBlock->pBlockAgg = NULL; - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int64_t groupId = pRuntimeEnv->current->groupIndex; bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); @@ -3184,10 +3155,10 @@ static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t num } void setTagValue(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pCtx, int32_t numOfOutput) { - SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; + STaskRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; SExprInfo *pExpr = pOperatorInfo->pExpr; - SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; SExprInfo* pExprInfo = &pExpr[0]; int32_t functionId = getExprFunctionId(pExprInfo); @@ -3242,7 +3213,7 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void *pTable, SQLFunctionCtx* pCt } } -void copyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBlock* pBlock, int32_t* offset) { +void copyToSDataBlock(STaskRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBlock* pBlock, int32_t* offset) { SGroupResInfo* pGroupResInfo = &pRuntimeEnv->groupResInfo; pBlock->info.rows = 0; @@ -3293,8 +3264,8 @@ static void updateTableQueryInfoForReverseScan(STableQueryInfo *pTableQueryInfo) } } -static void setupQueryRangeForReverseScan(SQueryRuntimeEnv* pRuntimeEnv) { - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; +static void setupQueryRangeForReverseScan(STaskRuntimeEnv* pRuntimeEnv) { + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t numOfGroups = (int32_t)(GET_NUM_OF_TABLEGROUP(pRuntimeEnv)); for(int32_t i = 0; i < numOfGroups; ++i) { @@ -3337,7 +3308,7 @@ int32_t initResultRow(SResultRow *pResultRow) { * +------------+-------------------------------------------+-------------------------------------------+ * offset[0] offset[1] offset[2] */ -void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, int64_t uid, int32_t stage) { +void setDefaultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, int64_t uid, int32_t stage) { SQLFunctionCtx* pCtx = pInfo->pCtx; SSDataBlock* pDataBlock = pInfo->pRes; int32_t* rowCellInfoOffset = pInfo->rowCellInfoOffset; @@ -3461,7 +3432,7 @@ void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size) { } } -void setQueryStatus(SQueryRuntimeEnv *pRuntimeEnv, int8_t status) { +void setQueryStatus(STaskRuntimeEnv *pRuntimeEnv, int8_t status) { if (status == QUERY_NOT_COMPLETED) { pRuntimeEnv->status = status; } else { @@ -3471,8 +3442,8 @@ void setQueryStatus(SQueryRuntimeEnv *pRuntimeEnv, int8_t status) { } } -static void setupEnvForReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, SQLFunctionCtx* pCtx, int32_t numOfOutput) { - SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; +static void setupEnvForReverseScan(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, SQLFunctionCtx* pCtx, int32_t numOfOutput) { + STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; if (pRuntimeEnv->pTsBuf) { SWITCH_ORDER(pRuntimeEnv->pTsBuf->cur.order); @@ -3493,8 +3464,8 @@ static void setupEnvForReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo } void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) { - SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; - SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; + STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t numOfOutput = pOperator->numOfOutput; if (pQueryAttr->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQueryAttr) || pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow) { @@ -3539,7 +3510,7 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult } } -static bool hasMainOutput(SQueryAttr *pQueryAttr) { +static bool hasMainOutput(STaskAttr *pQueryAttr) { for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]); @@ -3551,7 +3522,7 @@ static bool hasMainOutput(SQueryAttr *pQueryAttr) { return false; } -STableQueryInfo *createTableQueryInfo(SQueryAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win, void* buf) { +STableQueryInfo *createTableQueryInfo(STaskAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win, void* buf) { STableQueryInfo *pTableQueryInfo = buf; pTableQueryInfo->win = win; @@ -3602,7 +3573,7 @@ void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) { cleanupResultRowInfo(&pTableQueryInfo->resInfo); } -void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, +void setResultRowOutputBufInitCtx(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) { // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group SFilePage* bufPage = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId); @@ -3635,7 +3606,7 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe } } -void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, SQLFunctionCtx* pCtx, +void doSetTableGroupOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput, int32_t tableGroupId) { // for simple group by query without interval, all the tables belong to one group result. int64_t uid = 0; @@ -3659,7 +3630,7 @@ void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pRe setResultRowOutputBufInitCtx(pRuntimeEnv, pResultRow, pCtx, numOfOutput, rowCellInfoOffset); } -void setExecutionContext(SQueryRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, int32_t numOfOutput, int32_t tableGroupId, +void setExecutionContext(STaskRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, int32_t numOfOutput, int32_t tableGroupId, TSKEY nextKey) { STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current; @@ -3675,7 +3646,7 @@ void setExecutionContext(SQueryRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, i pRuntimeEnv->prevGroupId = tableGroupId; } -void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, +void setResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, int32_t numOfCols, int32_t* rowCellInfoOffset) { // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group SFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId); @@ -3698,8 +3669,8 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF } } -void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable) { - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; +void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable) { + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SSqlExpr* pExpr = &pExprInfo->base; // if (pQueryAttr->stableQuery && (pRuntimeEnv->pTsBuf != NULL) && @@ -3723,8 +3694,8 @@ void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExpr // } } -int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, SVariant* pTag, STableQueryInfo *pTableQueryInfo) { - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; +int32_t setTimestampListJoinInfo(STaskRuntimeEnv* pRuntimeEnv, SVariant* pTag, STableQueryInfo *pTableQueryInfo) { + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; assert(pRuntimeEnv->pTsBuf != NULL); @@ -3766,9 +3737,9 @@ int32_t setTimestampListJoinInfo(SQueryRuntimeEnv* pRuntimeEnv, SVariant* pTag, } // TODO refactor: this funciton should be merged with setparamForStableStddevColumnData function. -void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExprInfo) { +void setParamForStableStddev(STaskRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExprInfo) { #if 0 - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t numOfExprs = pQueryAttr->numOfOutput; for(int32_t i = 0; i < numOfExprs; ++i) { @@ -3801,8 +3772,8 @@ void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx #endif } -void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes) { - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; +void setParamForStableStddevByColData(STaskRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes) { + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; #if 0 int32_t numOfExprs = pQueryAttr->numOfOutput; for(int32_t i = 0; i < numOfExprs; ++i) { @@ -3842,8 +3813,8 @@ void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunction * merged during merge stage. In this case, we need the pTableQueryInfo->lastResRows to decide if there * is a previous result generated or not. */ -void setIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key) { - SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; +void setIntervalQueryRange(STaskRuntimeEnv *pRuntimeEnv, TSKEY key) { + STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current; SResultRowInfo *pResultRowInfo = &pTableQueryInfo->resInfo; @@ -3887,8 +3858,8 @@ void setIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key) { * @param result */ -static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock) { - SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; +static int32_t doCopyToSDataBlock(STaskRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock) { + STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); int32_t numOfResult = pBlock->info.rows; // there are already exists result rows @@ -3946,7 +3917,7 @@ static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* return 0; } -static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock) { +static void toSSDataBlock(SGroupResInfo *pGroupResInfo, STaskRuntimeEnv* pRuntimeEnv, SSDataBlock* pBlock) { assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup); pBlock->info.rows = 0; @@ -3954,7 +3925,7 @@ static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRunti return; } - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t orderType = TSDB_ORDER_ASC;//(pQueryAttr->pGroupbyExpr != NULL) ? pQueryAttr->pGroupbyExpr->orderType : TSDB_ORDER_ASC; doCopyToSDataBlock(pRuntimeEnv, pGroupResInfo, orderType, pBlock); @@ -3969,9 +3940,9 @@ static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRunti } } -static void updateNumOfRowsInResultRows(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, +static void updateNumOfRowsInResultRows(STaskRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) { - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; // update the number of result for each, only update the number of rows for the corresponding window result. if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) { @@ -4000,8 +3971,8 @@ static int32_t compressQueryColData(SColumnInfoData *pColRes, int32_t numOfRows, } static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data, int8_t compressed, int32_t *compLen) { - SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; + STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; SSDataBlock* pRes = pRuntimeEnv->outputBuf; @@ -4187,7 +4158,7 @@ void calculateOperatorProfResults(SQInfo* pQInfo) { } void queryCostStatis(SQInfo *pQInfo) { - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; + STaskRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryCostInfo *pSummary = &pQInfo->summary; uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable); @@ -4226,8 +4197,8 @@ void queryCostStatis(SQInfo *pQInfo) { } } -//static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) { -// SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; +//static void updateOffsetVal(STaskRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) { +// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; // STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; // // int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); @@ -4262,8 +4233,8 @@ void queryCostStatis(SQInfo *pQInfo) { // pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey); //} -//void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) { -// SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; +//void skipBlocks(STaskRuntimeEnv *pRuntimeEnv) { +// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; // // if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0) { // return; @@ -4301,8 +4272,8 @@ void queryCostStatis(SQInfo *pQInfo) { // } //} -//static TSKEY doSkipIntervalProcess(SQueryRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo, STableQueryInfo* pTableQueryInfo) { -// SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; +//static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo, STableQueryInfo* pTableQueryInfo) { +// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; // SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo; // // assert(pQueryAttr->limit.offset == 0); @@ -4352,8 +4323,8 @@ void queryCostStatis(SQInfo *pQInfo) { // return true; //} -//static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv, TSKEY* start) { -// SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; +//static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) { +// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; // if (QUERY_IS_ASC_QUERY(pQueryAttr)) { // assert(*start <= pRuntimeEnv->current->lastKey); // } else { @@ -4463,18 +4434,18 @@ void queryCostStatis(SQInfo *pQInfo) { //} void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream) { - if (p->upstream == NULL) { + if (p->pDownstream == NULL) { assert(p->numOfUpstream == 0); } - p->upstream = realloc(p->upstream, POINTER_BYTES * (p->numOfUpstream + 1)); - p->upstream[p->numOfUpstream++] = pUpstream; + p->pDownstream = realloc(p->pDownstream, POINTER_BYTES * (p->numOfUpstream + 1)); + p->pDownstream[p->numOfUpstream++] = pUpstream; } static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo); -static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64_t qId, bool isSTableQuery) { - SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; +static int32_t setupQueryHandle(void* tsdb, STaskRuntimeEnv* pRuntimeEnv, int64_t qId, bool isSTableQuery) { + STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; #if 0 // TODO set the tags scan handle if (onlyQueryTags(pQueryAttr)) { @@ -4533,9 +4504,9 @@ static int32_t setupQueryHandle(void* tsdb, SQueryRuntimeEnv* pRuntimeEnv, int64 int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr, int32_t tbScanner, SArray* pOperator, void* param) { - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; + STaskRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; - SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; + STaskAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; pQueryAttr->tsdb = tsdb; if (tsdb != NULL) { @@ -4620,7 +4591,7 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr return TSDB_CODE_SUCCESS; } -static void doTableQueryInfoTimeWindowCheck(SQueryAttr* pQueryAttr, STableQueryInfo* pTableQueryInfo) { +static void doTableQueryInfoTimeWindowCheck(STaskAttr* pQueryAttr, STableQueryInfo* pTableQueryInfo) { if (QUERY_IS_ASC_QUERY(pQueryAttr)) { assert( (pTableQueryInfo->win.skey <= pTableQueryInfo->win.ekey) && @@ -4634,7 +4605,7 @@ static void doTableQueryInfoTimeWindowCheck(SQueryAttr* pQueryAttr, STableQueryI } } -//STsdbQueryCond createTsdbQueryCond(SQueryAttr* pQueryAttr, STimeWindow* win) { +//STsdbQueryCond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win) { // STsdbQueryCond cond = { // .colList = pQueryAttr->tableCols, // .order = pQueryAttr->order.order, @@ -4676,7 +4647,7 @@ static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo) { // } //} -static void doCloseAllTimeWindow(SQueryRuntimeEnv* pRuntimeEnv) { +static void doCloseAllTimeWindow(STaskRuntimeEnv* pRuntimeEnv) { size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); for (int32_t i = 0; i < numOfGroup; ++i) { SArray* group = GET_TABLEGROUP(pRuntimeEnv, i); @@ -4694,8 +4665,8 @@ static SSDataBlock* doTableScanImpl(void* param, bool* newgroup) { STableScanInfo *pTableScanInfo = pOperator->info; SSDataBlock *pBlock = &pTableScanInfo->block; - SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; - SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; + STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; STableGroupInfo *pTableGroupInfo = &pOperator->pRuntimeEnv->tableqinfoGroupInfo; *newgroup = false; @@ -4751,8 +4722,8 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; STableScanInfo *pTableScanInfo = pOperator->info; - SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; - SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; + STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; SResultRowInfo* pResultRowInfo = pTableScanInfo->pResultRowInfo; *newgroup = false; @@ -4867,7 +4838,7 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) { } -SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime) { +SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv, int32_t repeatTime) { assert(repeatTime > 0); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); @@ -4891,7 +4862,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* return pOperator; } -SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) { +SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv) { STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); pInfo->pQueryHandle = pTsdbQueryHandle; @@ -4915,7 +4886,7 @@ SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeE return pOperator; } -SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) { +SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv) { STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); pInfo->pQueryHandle = pTsdbQueryHandle; @@ -4998,7 +4969,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf } -SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) { +SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, STaskRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) { assert(repeatTime > 0); STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo)); @@ -5019,7 +4990,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime return pOptr; } -SArray* getOrderCheckColumns(SQueryAttr* pQuery) { +SArray* getOrderCheckColumns(STaskAttr* pQuery) { int32_t numOfCols = (pQuery->pGroupbyExpr == NULL)? 0: taosArrayGetSize(pQuery->pGroupbyExpr->columnInfo); SArray* pOrderColumns = NULL; @@ -5058,7 +5029,7 @@ SArray* getOrderCheckColumns(SQueryAttr* pQuery) { return pOrderColumns; } -SArray* getResultGroupCheckColumns(SQueryAttr* pQuery) { +SArray* getResultGroupCheckColumns(STaskAttr* pQuery) { int32_t numOfCols = (pQuery->pGroupbyExpr == NULL)? 0 : taosArrayGetSize(pQuery->pGroupbyExpr->columnInfo); SArray* pOrderColumns = NULL; @@ -5109,7 +5080,7 @@ static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { tfree(pInfo->prevRow); } -SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, +SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp) { SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); @@ -5177,7 +5148,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, return pOperator; } -SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo *pExpr, int32_t numOfOutput, +SOperatorInfo *createMultiwaySortOperatorInfo(STaskRuntimeEnv *pRuntimeEnv, SExprInfo *pExpr, int32_t numOfOutput, int32_t numOfRows, void *merger) { SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); @@ -5252,9 +5223,9 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { SSDataBlock* pBlock = NULL; while(1) { - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); + publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); + pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup); + publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); // start to flush data into disk and try do multiway merge sort if (pBlock == NULL) { @@ -5288,7 +5259,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL; } -SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal) { +SOperatorInfo *createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal) { SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo)); { @@ -5339,12 +5310,12 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) { SAggOperatorInfo* pAggInfo = pOperator->info; SOptrBasicInfo* pInfo = &pAggInfo->binfo; - SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t order = pQueryAttr->order.order; - SOperatorInfo* upstream = pOperator->upstream[0]; + SOperatorInfo* upstream = pOperator->pDownstream[0]; while(1) { publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); @@ -5386,7 +5357,7 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) { SAggOperatorInfo* pAggInfo = pOperator->info; SOptrBasicInfo* pInfo = &pAggInfo->binfo; - SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (pOperator->status == OP_RES_TO_RETURN) { toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes); @@ -5398,10 +5369,10 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) { return pInfo->pRes; } - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t order = pQueryAttr->order.order; - SOperatorInfo* upstream = pOperator->upstream[0]; + SOperatorInfo* upstream = pOperator->pDownstream[0]; while(1) { publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); @@ -5455,7 +5426,7 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; SProjectOperatorInfo* pProjectInfo = pOperator->info; - SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SOptrBasicInfo *pInfo = &pProjectInfo->binfo; SSDataBlock* pRes = pInfo->pRes; @@ -5493,9 +5464,9 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { bool prevVal = *newgroup; // The upstream exec may change the value of the newgroup, so use a local variable instead. - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); + publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock* pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup); + publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { assert(*newgroup == false); @@ -5547,13 +5518,13 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) { } SLimitOperatorInfo* pInfo = pOperator->info; - SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SSDataBlock* pBlock = NULL; while (1) { - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); + publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); + pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup); + publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { doSetOperatorCompleted(pOperator); @@ -5599,12 +5570,12 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) { } SFilterOperatorInfo* pCondInfo = pOperator->info; - SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; while (1) { - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock *pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); + publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock *pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup); + publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -5631,7 +5602,7 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { STableIntervalOperatorInfo* pIntervalInfo = pOperator->info; - SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (pOperator->status == OP_RES_TO_RETURN) { toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { @@ -5641,11 +5612,11 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { return pIntervalInfo->pRes; } - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t order = pQueryAttr->order.order; STimeWindow win = pQueryAttr->window; - SOperatorInfo* upstream = pOperator->upstream[0]; + SOperatorInfo* upstream = pOperator->pDownstream[0]; while(1) { publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); @@ -5690,7 +5661,7 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { STableIntervalOperatorInfo* pIntervalInfo = pOperator->info; - SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (pOperator->status == OP_RES_TO_RETURN) { toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); @@ -5701,11 +5672,11 @@ static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) { return pIntervalInfo->pRes; } - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t order = pQueryAttr->order.order; STimeWindow win = pQueryAttr->window; - SOperatorInfo* upstream = pOperator->upstream[0]; + SOperatorInfo* upstream = pOperator->pDownstream[0]; while(1) { publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); @@ -5749,7 +5720,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { } STableIntervalOperatorInfo* pIntervalInfo = pOperator->info; - SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (pOperator->status == OP_RES_TO_RETURN) { int64_t st = taosGetTimestampUs(); @@ -5765,10 +5736,10 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { return pIntervalInfo->pRes; } - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t order = pQueryAttr->order.order; - SOperatorInfo* upstream = pOperator->upstream[0]; + SOperatorInfo* upstream = pOperator->pDownstream[0]; while(1) { publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); @@ -5809,7 +5780,7 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) { } STableIntervalOperatorInfo* pIntervalInfo = pOperator->info; - SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (pOperator->status == OP_RES_TO_RETURN) { copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); @@ -5820,10 +5791,10 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) { return pIntervalInfo->pRes; } - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t order = pQueryAttr->order.order; - SOperatorInfo* upstream = pOperator->upstream[0]; + SOperatorInfo* upstream = pOperator->pDownstream[0]; while(1) { publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); @@ -5862,7 +5833,7 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) { } static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { - SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; STableQueryInfo* item = pRuntimeEnv->current; SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex); @@ -5945,7 +5916,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { SStateWindowOperatorInfo* pWindowInfo = pOperator->info; SOptrBasicInfo* pBInfo = &pWindowInfo->binfo; - SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (pOperator->status == OP_RES_TO_RETURN) { toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); @@ -5956,10 +5927,10 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { return pBInfo->pRes; } - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t order = pQueryAttr->order.order; STimeWindow win = pQueryAttr->window; - SOperatorInfo* upstream = pOperator->upstream[0]; + SOperatorInfo* upstream = pOperator->pDownstream[0]; while (1) { publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = upstream->exec(upstream, newgroup); @@ -6004,7 +5975,7 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { SOptrBasicInfo* pBInfo = &pWindowInfo->binfo; - SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (pOperator->status == OP_RES_TO_RETURN) { toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); @@ -6015,12 +5986,12 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { return pBInfo->pRes; } - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; //pQueryAttr->order.order = TSDB_ORDER_ASC; int32_t order = pQueryAttr->order.order; STimeWindow win = pQueryAttr->window; - SOperatorInfo* upstream = pOperator->upstream[0]; + SOperatorInfo* upstream = pOperator->pDownstream[0]; while(1) { publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); @@ -6062,7 +6033,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { SGroupbyOperatorInfo *pInfo = pOperator->info; - SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (pOperator->status == OP_RES_TO_RETURN) { toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes); @@ -6073,7 +6044,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { return pInfo->binfo.pRes; } - SOperatorInfo* upstream = pOperator->upstream[0]; + SOperatorInfo* upstream = pOperator->pDownstream[0]; while(1) { publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); @@ -6117,7 +6088,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { return pInfo->binfo.pRes; } -static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo *pInfo, SQueryRuntimeEnv* pRuntimeEnv, bool* newgroup) { +static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo *pInfo, STaskRuntimeEnv* pRuntimeEnv, bool* newgroup) { pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey; taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo)); @@ -6130,7 +6101,7 @@ static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo *pInfo, SQueryR *newgroup = true; } -static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SQueryRuntimeEnv *pRuntimeEnv, bool *newgroup) { +static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, STaskRuntimeEnv *pRuntimeEnv, bool *newgroup) { if (taosFillHasMoreResults(pInfo->pFillInfo)) { *newgroup = false; doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity, pInfo->p); @@ -6155,16 +6126,16 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { return NULL; } - SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; + STaskRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup); if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) { return pInfo->pRes; } while(1) { - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); + publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock* pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup); + publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (*newgroup) { assert(pBlock != NULL); @@ -6220,7 +6191,7 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { } // todo set the attribute of query scan count -static int32_t getNumOfScanTimes(SQueryAttr* pQueryAttr) { +static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr) { for(int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]); if (functionId == FUNCTION_STDDEV || functionId == FUNCTION_PERCT) { @@ -6240,12 +6211,12 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { pOperator->cleanup(pOperator->info, pOperator->numOfOutput); } - if (pOperator->upstream != NULL) { + if (pOperator->pDownstream != NULL) { for(int32_t i = 0; i < pOperator->numOfUpstream; ++i) { - destroyOperatorInfo(pOperator->upstream[i]); + destroyOperatorInfo(pOperator->pDownstream[i]); } - tfree(pOperator->upstream); + tfree(pOperator->pDownstream); pOperator->numOfUpstream = 0; } @@ -6253,10 +6224,10 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { tfree(pOperator); } -SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; int32_t numOfRows = (int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows); @@ -6353,7 +6324,7 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { pInfo->pRes = destroyOutputBuf(pInfo->pRes); } -SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); size_t tableGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv); @@ -6379,7 +6350,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO return pOperator; } -SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SProjectOperatorInfo* pInfo = calloc(1, sizeof(SProjectOperatorInfo)); pInfo->seed = rand(); @@ -6442,7 +6413,7 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3 return 0; } -SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, +SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter) { SFilterOperatorInfo* pInfo = calloc(1, sizeof(SFilterOperatorInfo)); @@ -6467,7 +6438,7 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator return pOperator; } -SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { +SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream) { SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo)); pInfo->limit = pRuntimeEnv->pQueryAttr->limit.limit; @@ -6485,7 +6456,7 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI return pOperator; } -SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); @@ -6510,7 +6481,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp } -SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); @@ -6534,7 +6505,7 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, return pOperator; } -SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo)); pInfo->colIndex = -1; pInfo->reptScan = false; @@ -6557,7 +6528,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe appendUpstream(pOperator, upstream); return pOperator; } -SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo)); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); @@ -6583,7 +6554,7 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato return pOperator; } -SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); @@ -6607,7 +6578,7 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRunti return pOperator; } -SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset); @@ -6633,14 +6604,14 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRu } -SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SGroupbyOperatorInfo* pInfo = calloc(1, sizeof(SGroupbyOperatorInfo)); pInfo->colIndex = -1; // group by column index pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); - SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; pQueryAttr->resultRowSize = (pQueryAttr->resultRowSize * (int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery))); @@ -6664,13 +6635,13 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato return pOperator; } -SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult) { +SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult) { SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo)); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->multigroupResult = multigroupResult; { - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; struct SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfOutput, pQueryAttr->fillVal); STimeWindow w = TSWINDOW_INITIALIZER; @@ -6703,10 +6674,10 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn return pOperator; } -SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) { +SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) { SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo)); - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr); pInfo->slimit = pQueryAttr->slimit; @@ -6758,7 +6729,7 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) { return NULL; } - SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; int32_t maxNumOfTables = (int32_t)pRuntimeEnv->resultInfo.capacity; STagScanInfo *pInfo = pOperator->info; @@ -6770,7 +6741,7 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) { int32_t functionId = getExprFunctionId(&pOperator->pExpr[0]); if (functionId == FUNCTION_TID_TAG) { // return the tags & table Id - SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; assert(pQueryAttr->numOfOutput == 1); SExprInfo* pExprInfo = &pOperator->pExpr[0]; @@ -6883,7 +6854,7 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) { #endif } -SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput) { STagScanInfo* pInfo = calloc(1, sizeof(STagScanInfo)); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); @@ -6968,9 +6939,9 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { SSDataBlock* pBlock = NULL; while(1) { - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); + publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); + pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup); + publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { doSetOperatorCompleted(pOperator); @@ -7021,7 +6992,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; } -SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SDistinctOperatorInfo* pInfo = calloc(1, sizeof(SDistinctOperatorInfo)); pInfo->totalBytes = 0; pInfo->buf = NULL; @@ -7207,7 +7178,7 @@ static int32_t deserializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t * @param pExpr * @return */ -int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { +int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, STaskParam* param) { int32_t code = TSDB_CODE_SUCCESS; // if (taosCheckVersion(pQueryMsg->version, version, 3) != 0) { @@ -7924,7 +7895,7 @@ void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFil return NULL; } -int32_t createFilterInfo(SQueryAttr* pQueryAttr, uint64_t qId) { +int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId) { for (int32_t i = 0; i < pQueryAttr->numOfCols; ++i) { // if (pQueryAttr->tableCols[i].flist.numOfFilters > 0 && pQueryAttr->tableCols[i].flist.filterInfo != NULL) { // pQueryAttr->numOfFilterCols++; @@ -7943,7 +7914,7 @@ int32_t createFilterInfo(SQueryAttr* pQueryAttr, uint64_t qId) { return TSDB_CODE_SUCCESS; } -static void doUpdateExprColumnIndex(SQueryAttr *pQueryAttr) { +static void doUpdateExprColumnIndex(STaskAttr *pQueryAttr) { assert(pQueryAttr->pExpr1 != NULL && pQueryAttr != NULL); for (int32_t k = 0; k < pQueryAttr->numOfOutput; ++k) { @@ -7980,7 +7951,7 @@ static void doUpdateExprColumnIndex(SQueryAttr *pQueryAttr) { } } -void setResultBufSize(SQueryAttr* pQueryAttr, SRspResultInfo* pResultInfo) { +void setResultBufSize(STaskAttr* pQueryAttr, SRspResultInfo* pResultInfo) { const int32_t DEFAULT_RESULT_MSG_SIZE = 1024 * (1024 + 512); // the minimum number of rows for projection query @@ -8026,7 +7997,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S // to make sure third party won't overwrite this structure pQInfo->signature = pQInfo; - SQueryAttr* pQueryAttr = &pQInfo->query; + STaskAttr* pQueryAttr = &pQInfo->query; pQInfo->runtimeEnv.pQueryAttr = pQueryAttr; pQueryAttr->tableGroupInfo = *pTableGroupInfo; @@ -8145,7 +8116,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S pQueryAttr->window = pQueryMsg->window; updateDataCheckOrder(pQInfo, pQueryMsg, pQueryAttr->stableQuery); - SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; + STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; STimeWindow window = pQueryAttr->window; int32_t index = 0; @@ -8213,7 +8184,7 @@ _cleanup_qinfo: // filterFreeInfo(pFilters); _cleanup: - freeQInfo(pQInfo); + doDestroyTask(pQInfo); return NULL; } @@ -8231,14 +8202,14 @@ bool isValidQInfo(void *param) { return (sig == (uint64_t)pQInfo); } -int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, SQueryParam* param, char* start, +int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start, int32_t prevResultLen, void* merger) { int32_t code = TSDB_CODE_SUCCESS; - SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; + STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; pRuntimeEnv->qinfo = pQInfo; - SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; STSBuf *pTsBuf = NULL; @@ -8292,7 +8263,7 @@ int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* _error: // table query ref will be decrease during error handling - freeQInfo(pQInfo); + doDestroyTask(pQInfo); return code; } @@ -8373,20 +8344,20 @@ void* freeColumnInfo(SColumnInfo* pColumnInfo, int32_t numOfCols) { return NULL; } -void freeQInfo(SQInfo *pQInfo) { +void doDestroyTask(SQInfo *pQInfo) { if (!isValidQInfo(pQInfo)) { return; } //qDebug("QInfo:0x%"PRIx64" start to free QInfo", pQInfo->qId); - SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; + STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; releaseQueryBuf(pRuntimeEnv->tableqinfoGroupInfo.numOfTables); doDestroyTableQueryInfo(&pRuntimeEnv->tableqinfoGroupInfo); teardownQueryRuntimeEnv(&pQInfo->runtimeEnv); - SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; + STaskAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; freeQueryAttr(pQueryAttr); // tsdbDestroyTableGroup(&pQueryAttr->tableGroupInfo); @@ -8407,8 +8378,8 @@ void freeQInfo(SQInfo *pQInfo) { int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t *compLen) { // the remained number of retrieved rows, not the interpolated result - SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; + STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; + STaskAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; // load data from file to msg buffer if (pQueryAttr->tsCompQuery) { @@ -8535,8 +8506,8 @@ int32_t checkForQueryBuf(size_t numOfTables) { } bool checkNeedToCompressQueryCol(SQInfo *pQInfo) { - SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; + STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; + STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; SSDataBlock* pRes = pRuntimeEnv->outputBuf; @@ -8569,7 +8540,7 @@ void releaseQueryBuf(size_t numOfTables) { atomic_add_fetch_64(&tsQueryBufferSizeBytes, t); } -void freeQueryAttr(SQueryAttr* pQueryAttr) { +void freeQueryAttr(STaskAttr* pQueryAttr) { if (pQueryAttr != NULL) { if (pQueryAttr->fillVal != NULL) { tfree(pQueryAttr->fillVal); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index a7ec39bfde..ae60ad037f 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -126,7 +126,6 @@ typedef struct SSchJob { #define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock)) #define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock)) - extern int32_t schLaunchTask(SSchJob *job, SSchTask *task); extern int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, int32_t msgType); -- GitLab