From da0d9c78efa5e0fbfa05865c2273a325043d86f9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 28 Apr 2023 11:42:34 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tqRestore.c | 2 +- source/dnode/vnode/src/tq/tqScan.c | 1 + source/libs/executor/inc/executorInt.h | 644 +++++++++++++++- source/libs/executor/inc/executorimpl.h | 699 ------------------ source/libs/executor/inc/operator.h | 1 + source/libs/executor/inc/querytask.h | 29 + source/libs/executor/src/aggregateoperator.c | 10 +- source/libs/executor/src/cachescanoperator.c | 6 +- source/libs/executor/src/dataDeleter.c | 2 +- source/libs/executor/src/dataDispatcher.c | 2 +- source/libs/executor/src/dataInserter.c | 2 +- .../libs/executor/src/eventwindowoperator.c | 6 +- source/libs/executor/src/exchangeoperator.c | 16 +- source/libs/executor/src/executil.c | 4 +- source/libs/executor/src/executor.c | 30 +- .../src/{executorimpl.c => executorInt.c} | 59 +- source/libs/executor/src/filloperator.c | 2 +- source/libs/executor/src/groupoperator.c | 5 +- source/libs/executor/src/joinoperator.c | 6 +- source/libs/executor/src/operator.c | 120 ++- source/libs/executor/src/projectoperator.c | 2 +- source/libs/executor/src/querytask.c | 6 +- source/libs/executor/src/scanoperator.c | 2 +- source/libs/executor/src/sortoperator.c | 4 +- source/libs/executor/src/sysscanoperator.c | 2 +- source/libs/executor/src/tfill.c | 2 +- source/libs/executor/src/timesliceoperator.c | 6 +- source/libs/executor/src/timewindowoperator.c | 12 +- source/libs/executor/test/executorTests.cpp | 4 +- source/libs/executor/test/lhashTests.cpp | 2 +- source/libs/executor/test/sortTests.cpp | 2 +- 31 files changed, 814 insertions(+), 876 deletions(-) delete mode 100644 source/libs/executor/inc/executorimpl.h rename source/libs/executor/src/{executorimpl.c => executorInt.c} (95%) diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index c164d037e0..8cda5a21c9 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -107,7 +107,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__RECOVER_PREPARE || status == TASK_STATUS__WAIT_DOWNSTREAM) { - tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, status); + tqDebug("s-task:%s skip push data, not ready for processing, status:%d", pTask->id.idStr, status); streamMetaReleaseTask(pStreamMeta, pTask); continue; } diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 0f00a5acb8..8e243a8bd1 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -130,6 +130,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, terrstr()); return -1; } + tqDebug("tmqsnap task execute end, get %p", pDataBlock); if (pDataBlock != NULL && pDataBlock->info.rows > 0) { diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index d22a7460bb..a4f1e2ef94 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -12,14 +12,75 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ - -#ifndef _TD_EXECUTOR_INT_H -#define _TD_EXECUTOR_INT_H +#ifndef TDENGINE_EXECUTORINT_H +#define TDENGINE_EXECUTORINT_H #ifdef __cplusplus extern "C" { #endif +#include "os.h" +#include "tcommon.h" +#include "tlosertree.h" +#include "tsort.h" +#include "ttszip.h" +#include "tvariant.h" + +#include "dataSinkMgt.h" +#include "executil.h" +#include "executor.h" +#include "planner.h" +#include "scalar.h" +#include "taosdef.h" +#include "tarray.h" +#include "tfill.h" +#include "thash.h" +#include "tlockfree.h" +#include "tmsg.h" +#include "tpagedbuf.h" +#include "tstream.h" +#include "tstreamUpdate.h" + +#include "vnode.h" + +typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); + +#define IS_VALID_SESSION_WIN(winInfo) ((winInfo).sessionWin.win.skey > 0) +#define SET_SESSION_WIN_INVALID(winInfo) ((winInfo).sessionWin.win.skey = INT64_MIN) +#define IS_INVALID_SESSION_WIN_KEY(winKey) ((winKey).win.skey <= 0) +#define SET_SESSION_WIN_KEY_INVALID(pWinKey) ((pWinKey)->win.skey = INT64_MIN) + +/** + * If the number of generated results is greater than this value, + * query query will be halt and return results to client immediate. + */ +typedef struct SResultInfo { // TODO refactor + int64_t totalRows; // total generated result size in rows + int64_t totalBytes; // total results in bytes. + int32_t capacity; // capacity of current result output buffer + int32_t threshold; // result size threshold in rows. +} SResultInfo; + +typedef struct STableQueryInfo { + TSKEY lastKey; // last check ts, todo remove it later + SResultRowPosition pos; // current active time window +} STableQueryInfo; + +typedef struct SLimit { + int64_t limit; + int64_t offset; +} SLimit; + +typedef struct STableScanAnalyzeInfo SFileBlockLoadRecorder; + +enum { + STREAM_RECOVER_STEP__NONE = 0, + STREAM_RECOVER_STEP__PREPARE1, + STREAM_RECOVER_STEP__PREPARE2, + STREAM_RECOVER_STEP__SCAN1, + STREAM_RECOVER_STEP__SCAN2, +}; + extern int32_t exchangeObjRefPool; typedef struct { @@ -29,9 +90,584 @@ typedef struct { int32_t bytes; } SGroupKeys, SStateKeys; +typedef struct { + char* tablename; + char* dbname; + int32_t tversion; + SSchemaWrapper* sw; + SSchemaWrapper* qsw; +} SSchemaInfo; + +typedef struct SExchangeOpStopInfo { + int32_t operatorType; + int64_t refId; +} SExchangeOpStopInfo; + +typedef struct SExprSupp { + SExprInfo* pExprInfo; + int32_t numOfExprs; // the number of scalar expression in group operator + SqlFunctionCtx* pCtx; + int32_t* rowEntryInfoOffset; // offset value for each row result cell info + SFilterInfo* pFilterInfo; +} SExprSupp; + +typedef enum { + EX_SOURCE_DATA_NOT_READY = 0x1, + EX_SOURCE_DATA_READY = 0x2, + EX_SOURCE_DATA_EXHAUSTED = 0x3, +} EX_SOURCE_STATUS; + +#define COL_MATCH_FROM_COL_ID 0x1 +#define COL_MATCH_FROM_SLOT_ID 0x2 + +typedef struct SLoadRemoteDataInfo { + uint64_t totalSize; // total load bytes from remote + uint64_t totalRows; // total number of rows + uint64_t totalElapsed; // total elapsed time +} SLoadRemoteDataInfo; + +typedef struct SLimitInfo { + SLimit limit; + SLimit slimit; + uint64_t currentGroupId; + int64_t remainGroupOffset; + int64_t numOfOutputGroups; + int64_t remainOffset; + int64_t numOfOutputRows; +} SLimitInfo; + +typedef struct SExchangeInfo { + SArray* pSources; + SArray* pSourceDataInfo; + tsem_t ready; + void* pTransporter; + + // SArray, result block list, used to keep the multi-block that + // passed by downstream operator + SArray* pResultBlockList; + SArray* pRecycledBlocks; // build a pool for small data block to avoid to repeatly create and then destroy. + SSDataBlock* pDummyBlock; // dummy block, not keep data + bool seqLoadData; // sequential load data or not, false by default + int32_t current; + SLoadRemoteDataInfo loadInfo; + uint64_t self; + SLimitInfo limitInfo; + int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo +} SExchangeInfo; + +typedef struct SScanInfo { + int32_t numOfAsc; + int32_t numOfDesc; +} SScanInfo; + +typedef struct SSampleExecInfo { + double sampleRatio; // data block sample ratio, 1 by default + uint32_t seed; // random seed value +} SSampleExecInfo; + +enum { + TABLE_SCAN__TABLE_ORDER = 1, + TABLE_SCAN__BLOCK_ORDER = 2, +}; + +typedef struct SAggSupporter { + SSHashObj* pResultRowHashTable; // quick locate the window object for each result + char* keyBuf; // window key buffer + SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file + int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row + int32_t currentPageId; // current write page id +} SAggSupporter; + +typedef struct { + // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if + // current data block needs to be loaded. + SInterval interval; + SAggSupporter* pAggSup; + SExprSupp* pExprSup; // expr supporter of aggregate operator +} SAggOptrPushDownInfo; + +typedef struct STableMetaCacheInfo { + SLRUCache* pTableMetaEntryCache; // 100 by default + uint64_t metaFetch; + uint64_t cacheHit; +} STableMetaCacheInfo; + +typedef struct STableScanBase { + STsdbReader* dataReader; + SFileBlockLoadRecorder readRecorder; + SQueryTableDataCond cond; + SAggOptrPushDownInfo pdInfo; + SColMatchInfo matchInfo; + SReadHandle readHandle; + SExprSupp pseudoSup; + STableMetaCacheInfo metaCache; + int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan + int32_t dataBlockLoadFlag; + SLimitInfo limitInfo; + // there are more than one table list exists in one task, if only one vnode exists. + STableListInfo* pTableListInfo; +} STableScanBase; + +typedef struct STableScanInfo { + STableScanBase base; + SScanInfo scanInfo; + int32_t scanTimes; + SSDataBlock* pResBlock; + SSampleExecInfo sample; // sample execution info + int32_t currentGroupId; + int32_t currentTable; + int8_t scanMode; + int8_t assignBlockUid; + bool hasGroupByTag; + bool countOnly; +} STableScanInfo; + +typedef struct STableMergeScanInfo { + int32_t tableStartIndex; + int32_t tableEndIndex; + bool hasGroupId; + uint64_t groupId; + SArray* queryConds; // array of queryTableDataCond + STableScanBase base; + int32_t bufPageSize; + uint32_t sortBufSize; // max buffer size for in-memory sort + SArray* pSortInfo; + SSortHandle* pSortHandle; + SSDataBlock* pSortInputBlock; + int64_t startTs; // sort start time + SArray* sortSourceParams; + SLimitInfo limitInfo; + int64_t numOfRows; + SScanInfo scanInfo; + int32_t scanTimes; + SSDataBlock* pResBlock; + SSampleExecInfo sample; // sample execution info + SSortExecInfo sortExecInfo; +} STableMergeScanInfo; + +typedef struct STagScanInfo { + SColumnInfo* pCols; + SSDataBlock* pRes; + SColMatchInfo matchInfo; + int32_t curPos; + SReadHandle readHandle; + STableListInfo* pTableListInfo; +} STagScanInfo; + +typedef enum EStreamScanMode { + STREAM_SCAN_FROM_READERHANDLE = 1, + STREAM_SCAN_FROM_RES, + STREAM_SCAN_FROM_UPDATERES, + STREAM_SCAN_FROM_DELETE_DATA, + STREAM_SCAN_FROM_DATAREADER_RETRIEVE, + STREAM_SCAN_FROM_DATAREADER_RANGE, +} EStreamScanMode; + +enum { + PROJECT_RETRIEVE_CONTINUE = 0x1, + PROJECT_RETRIEVE_DONE = 0x2, +}; + +typedef struct SStreamAggSupporter { + int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row + SSDataBlock* pScanBlock; + SStreamState* pState; + int64_t gap; // stream session window gap + SqlFunctionCtx* pDummyCtx; // for combine + SSHashObj* pResultRows; + int32_t stateKeySize; + int16_t stateKeyType; + SDiskbasedBuf* pResultBuf; +} SStreamAggSupporter; + +typedef struct SWindowSupporter { + SStreamAggSupporter* pStreamAggSup; + int64_t gap; + uint16_t parentType; + SAggSupporter* pIntervalAggSup; +} SWindowSupporter; + +typedef struct SPartitionBySupporter { + SArray* pGroupCols; // group by columns, SArray + SArray* pGroupColVals; // current group column values, SArray + char* keyBuf; // group by keys for hash + bool needCalc; // partition by column +} SPartitionBySupporter; + +typedef struct SPartitionDataInfo { + uint64_t groupId; + char* tbname; + SArray* tags; + SArray* rowIds; +} SPartitionDataInfo; + +typedef struct STimeWindowAggSupp { + int8_t calTrigger; + int8_t calTriggerSaved; + int64_t deleteMark; + int64_t deleteMarkSaved; + int64_t waterMark; + TSKEY maxTs; + TSKEY minTs; + SColumnInfoData timeWindowData; // query time window info for scalar function execution. +} STimeWindowAggSupp; + +typedef struct SStreamScanInfo { + SExprInfo* pPseudoExpr; + int32_t numOfPseudoExpr; + SExprSupp tbnameCalSup; + SExprSupp tagCalSup; + int32_t primaryTsIndex; // primary time stamp slot id + SReadHandle readHandle; + SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. + SColMatchInfo matchInfo; + + SArray* pBlockLists; // multiple SSDatablock. + SSDataBlock* pRes; // result SSDataBlock + SSDataBlock* pUpdateRes; // update SSDataBlock + int32_t updateResIndex; + int32_t blockType; // current block type + int32_t validBlockIndex; // Is current data has returned? + uint64_t numOfExec; // execution times + STqReader* tqReader; + + uint64_t groupId; + SUpdateInfo* pUpdateInfo; + + EStreamScanMode scanMode; + struct SOperatorInfo* pStreamScanOp; + struct SOperatorInfo* pTableScanOp; + SArray* childIds; + SWindowSupporter windowSup; + SPartitionBySupporter partitionSup; + SExprSupp* pPartScalarSup; + bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA. + int32_t scanWinIndex; // for state operator + int32_t pullDataResIndex; + SSDataBlock* pPullDataRes; // pull data SSDataBlock + SSDataBlock* pDeleteDataRes; // delete data SSDataBlock + int32_t deleteDataIndex; + STimeWindow updateWin; + STimeWindowAggSupp twAggSup; + SSDataBlock* pUpdateDataRes; + // status for tmq + SNodeList* pGroupTags; + SNode* pTagCond; + SNode* pTagIndexCond; + + // recover + int32_t blockRecoverContiCnt; + int32_t blockRecoverTotCnt; + SSDataBlock* pRecoverRes; + + SSDataBlock* pCreateTbRes; + int8_t igCheckUpdate; + int8_t igExpired; +} SStreamScanInfo; + +typedef struct { + SVnode* vnode; + SSDataBlock pRes; // result SSDataBlock + STsdbReader* dataReader; + SSnapContext* sContext; + STableListInfo* pTableListInfo; +} SStreamRawScanInfo; + +typedef struct STableCountScanSupp { + int16_t dbNameSlotId; + int16_t stbNameSlotId; + int16_t tbCountSlotId; + bool groupByDbName; + bool groupByStbName; + char dbNameFilter[TSDB_DB_NAME_LEN]; + char stbNameFilter[TSDB_TABLE_NAME_LEN]; +} STableCountScanSupp; + +typedef struct SOptrBasicInfo { + SResultRowInfo resultRowInfo; + SSDataBlock* pRes; + bool mergeResultBlock; +} SOptrBasicInfo; + +typedef struct SIntervalAggOperatorInfo { + SOptrBasicInfo binfo; // basic info + SAggSupporter aggSup; // aggregate supporter + SExprSupp scalarSupp; // supporter for perform scalar function + SGroupResInfo groupResInfo; // multiple results build supporter + SInterval interval; // interval info + int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. + STimeWindow win; // query time range + bool timeWindowInterpo; // interpolation needed or not + SArray* pInterpCols; // interpolation columns + int32_t resultTsOrder; // result timestamp order + int32_t inputOrder; // input data ts order + EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] + STimeWindowAggSupp twAggSup; + SArray* pPrevValues; // SArray used to keep the previous not null value for interpolation. +} SIntervalAggOperatorInfo; + +typedef struct SMergeAlignedIntervalAggOperatorInfo { + SIntervalAggOperatorInfo* intervalAggOperatorInfo; + + uint64_t groupId; // current groupId + int64_t curTs; // current ts + SSDataBlock* prefetchedBlock; + SResultRow* pResultRow; +} SMergeAlignedIntervalAggOperatorInfo; + +typedef struct SStreamIntervalOperatorInfo { + SOptrBasicInfo binfo; // basic info + SAggSupporter aggSup; // aggregate supporter + SExprSupp scalarSupp; // supporter for perform scalar function + SGroupResInfo groupResInfo; // multiple results build supporter + SInterval interval; // interval info + int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. + STimeWindowAggSupp twAggSup; + bool invertible; + bool ignoreExpiredData; + bool ignoreExpiredDataSaved; + SArray* pDelWins; // SWinRes + int32_t delIndex; + SSDataBlock* pDelRes; + SPhysiNode* pPhyNode; // create new child + SHashObj* pPullDataMap; + SArray* pPullWins; // SPullWindowInfo + int32_t pullIndex; + SSDataBlock* pPullDataRes; + bool isFinal; + SArray* pChildren; + SStreamState* pState; + SWinKey delKey; + uint64_t numOfDatapack; + SArray* pUpdated; + SSHashObj* pUpdatedMap; + int64_t dataVersion; +} SStreamIntervalOperatorInfo; + +typedef struct SDataGroupInfo { + uint64_t groupId; + int64_t numOfRows; + SArray* pPageList; +} SDataGroupInfo; + +typedef struct SWindowRowsSup { + STimeWindow win; + TSKEY prevTs; + int32_t startRowIndex; + int32_t numOfRows; + uint64_t groupId; +} SWindowRowsSup; + +typedef struct SResultWindowInfo { + void* pOutputBuf; + SSessionKey sessionWin; + bool isOutput; +} SResultWindowInfo; + +typedef struct SStreamSessionAggOperatorInfo { + SOptrBasicInfo binfo; + SStreamAggSupporter streamAggSup; + SExprSupp scalarSupp; // supporter for perform scalar function + SGroupResInfo groupResInfo; + int32_t primaryTsIndex; // primary timestamp slot id + int32_t endTsIndex; // window end timestamp slot id + int32_t order; // current SSDataBlock scan order + STimeWindowAggSupp twAggSup; + SSDataBlock* pWinBlock; // window result + SSDataBlock* pDelRes; // delete result + SSDataBlock* pUpdateRes; // update window + bool returnUpdate; + SSHashObj* pStDeleted; + void* pDelIterator; + SArray* pChildren; // cache for children's result; final stream operator + SPhysiNode* pPhyNode; // create new child + bool isFinal; + bool ignoreExpiredData; + bool ignoreExpiredDataSaved; + SArray* pUpdated; + SSHashObj* pStUpdated; + int64_t dataVersion; +} SStreamSessionAggOperatorInfo; + +typedef struct SStreamStateAggOperatorInfo { + SOptrBasicInfo binfo; + SStreamAggSupporter streamAggSup; + SExprSupp scalarSupp; // supporter for perform scalar function + SGroupResInfo groupResInfo; + int32_t primaryTsIndex; // primary timestamp slot id + STimeWindowAggSupp twAggSup; + SColumn stateCol; + SSDataBlock* pDelRes; + SSHashObj* pSeDeleted; + void* pDelIterator; + SArray* pChildren; // cache for children's result; + bool ignoreExpiredData; + bool ignoreExpiredDataSaved; + SArray* pUpdated; + SSHashObj* pSeUpdated; + int64_t dataVersion; +} SStreamStateAggOperatorInfo; + +typedef struct SStreamPartitionOperatorInfo { + SOptrBasicInfo binfo; + SPartitionBySupporter partitionSup; + SExprSupp scalarSup; + SExprSupp tbnameCalSup; + SExprSupp tagCalSup; + SHashObj* pPartitions; + void* parIte; + void* pTbNameIte; + SSDataBlock* pInputDataBlock; + int32_t tsColIndex; + SSDataBlock* pDelRes; + SSDataBlock* pCreateTbRes; +} SStreamPartitionOperatorInfo; + +typedef struct SStreamFillSupporter { + int32_t type; // fill type + SInterval interval; + SResultRowData prev; + SResultRowData cur; + SResultRowData next; + SResultRowData nextNext; + SFillColInfo* pAllColInfo; // fill exprs and not fill exprs + SExprSupp notFillExprSup; + int32_t numOfAllCols; // number of all exprs, including the tags columns + int32_t numOfFillCols; + int32_t numOfNotFillCols; + int32_t rowSize; + SSHashObj* pResMap; + bool hasDelete; +} SStreamFillSupporter; + +typedef struct SStreamFillOperatorInfo { + SStreamFillSupporter* pFillSup; + SSDataBlock* pRes; + SSDataBlock* pSrcBlock; + int32_t srcRowIndex; + SSDataBlock* pSrcDelBlock; + int32_t srcDelRowIndex; + SSDataBlock* pDelRes; + SColMatchInfo matchInfo; + int32_t primaryTsCol; + int32_t primarySrcSlotId; + SStreamFillInfo* pFillInfo; +} SStreamFillOperatorInfo; + +#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) +#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) + +SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode); +int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, const char* dbName, SExecTaskInfo* pTaskInfo); +void cleanupQueriedTableScanInfo(SSchemaInfo* pSchemaInfo); + +void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock); +void cleanupBasicInfo(SOptrBasicInfo* pInfo); + +int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr); +void cleanupExprSupp(SExprSupp* pSup); + +void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs); + +int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, + const char* pkey, void* pState); +void cleanupAggSup(SAggSupporter* pAggSup); + +void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows); + +void doBuildStreamResBlock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, + SDiskbasedBuf* pBuf); +void doBuildResultDatablock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, + SDiskbasedBuf* pBuf); + +bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); +bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo); +void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo); +void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo); +bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); + +void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, + int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput); + +int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart); +void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs, + struct SOperatorInfo* pOperator); + +STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order); +int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); + +extern void doDestroyExchangeOperatorInfo(void* param); + +void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo); +int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock, + int32_t rows, const char* idStr, STableMetaCacheInfo* pCache); + +void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); +void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name); + +void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset); +void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput); + +SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData, + int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo, + bool isIntervalQuery, SAggSupporter* pSup, bool keepGroup); + +int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, + int32_t numOfOutput, SArray* pPseudoList); + +void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol); + +int32_t checkForQueryBuf(size_t numOfTables); + +int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* pTask, SReadHandle* readHandle); + +STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, + int32_t order); +int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey, + __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order); +int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); +SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize); +void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, SSessionKey* pKey); +bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap); +bool functionNeedToExecute(SqlFunctionCtx* pCtx); +bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup); +bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup); +bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup); +void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, + uint64_t* pGp, void* pTbName); +uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId); + +int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup, + SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); + +bool groupbyTbname(SNodeList* pGroupList); +int32_t buildDataBlockFromGroupRes(struct SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, + SGroupResInfo* pGroupResInfo); +int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size); +int32_t buildSessionResultDataBlock(struct SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, + SExprSupp* pSup, SGroupResInfo* pGroupResInfo); +int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, + SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup); +int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult); +int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize); +void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order); +int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order, + int64_t* pData); +void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId, + SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock); + +SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag); +SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs); + +void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx, + SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo); +void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset); +void doClearBufferedBlocks(SStreamScanInfo* pInfo); + uint64_t calcGroupId(char* pData, int32_t len); + #ifdef __cplusplus } #endif -#endif /*_TD_EXECUTOR_INT_H*/ \ No newline at end of file +#endif // TDENGINE_EXECUTORINT_H diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h deleted file mode 100644 index 4d19b8bb76..0000000000 --- a/source/libs/executor/inc/executorimpl.h +++ /dev/null @@ -1,699 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -#ifndef TDENGINE_EXECUTORIMPL_H -#define TDENGINE_EXECUTORIMPL_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include "os.h" -#include "tcommon.h" -#include "tlosertree.h" -#include "tsort.h" -#include "ttszip.h" -#include "tvariant.h" - -#include "dataSinkMgt.h" -#include "executil.h" -#include "executor.h" -#include "planner.h" -#include "scalar.h" -#include "taosdef.h" -#include "tarray.h" -#include "tfill.h" -#include "thash.h" -#include "tlockfree.h" -#include "tmsg.h" -#include "tpagedbuf.h" -#include "tstream.h" -#include "tstreamUpdate.h" - -#include "executorInt.h" -#include "vnode.h" - -typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); - -#define IS_VALID_SESSION_WIN(winInfo) ((winInfo).sessionWin.win.skey > 0) -#define SET_SESSION_WIN_INVALID(winInfo) ((winInfo).sessionWin.win.skey = INT64_MIN) -#define IS_INVALID_SESSION_WIN_KEY(winKey) ((winKey).win.skey <= 0) -#define SET_SESSION_WIN_KEY_INVALID(pWinKey) ((pWinKey)->win.skey = INT64_MIN) - -enum { - // when this task starts to execute, this status will set - TASK_NOT_COMPLETED = 0x1u, - - /* Task is over - * 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc. - * 2. when all data within queried time window, it is also denoted as query_completed - */ - TASK_COMPLETED = 0x2u, -}; - -/** - * If the number of generated results is greater than this value, - * query query will be halt and return results to client immediate. - */ -typedef struct SResultInfo { // TODO refactor - int64_t totalRows; // total generated result size in rows - int64_t totalBytes; // total results in bytes. - int32_t capacity; // capacity of current result output buffer - int32_t threshold; // result size threshold in rows. -} SResultInfo; - -typedef struct STableQueryInfo { - TSKEY lastKey; // last check ts, todo remove it later - SResultRowPosition pos; // current active time window -} STableQueryInfo; - -typedef struct SLimit { - int64_t limit; - int64_t offset; -} SLimit; - -typedef struct STableScanAnalyzeInfo SFileBlockLoadRecorder; - -enum { - STREAM_RECOVER_STEP__NONE = 0, - STREAM_RECOVER_STEP__PREPARE1, - STREAM_RECOVER_STEP__PREPARE2, - STREAM_RECOVER_STEP__SCAN1, - STREAM_RECOVER_STEP__SCAN2, -}; - -typedef struct { - STqOffsetVal currentOffset; // for tmq - SMqMetaRsp metaRsp; // for tmq fetching meta - int64_t snapshotVer; - SPackedData submit; - SSchemaWrapper* schema; - char tbName[TSDB_TABLE_NAME_LEN]; - int8_t recoverStep; - int8_t recoverScanFinished; - SQueryTableDataCond tableCond; - int64_t fillHistoryVer1; - int64_t fillHistoryVer2; - SStreamState* pState; - int64_t dataVersion; - int64_t checkPointId; -} SStreamTaskInfo; - -typedef struct { - char* tablename; - char* dbname; - int32_t tversion; - SSchemaWrapper* sw; - SSchemaWrapper* qsw; -} SSchemaInfo; - -typedef struct SExchangeOpStopInfo { - int32_t operatorType; - int64_t refId; -} SExchangeOpStopInfo; - -typedef struct SExprSupp { - SExprInfo* pExprInfo; - int32_t numOfExprs; // the number of scalar expression in group operator - SqlFunctionCtx* pCtx; - int32_t* rowEntryInfoOffset; // offset value for each row result cell info - SFilterInfo* pFilterInfo; -} SExprSupp; - -typedef enum { - EX_SOURCE_DATA_NOT_READY = 0x1, - EX_SOURCE_DATA_READY = 0x2, - EX_SOURCE_DATA_EXHAUSTED = 0x3, -} EX_SOURCE_STATUS; - -#define COL_MATCH_FROM_COL_ID 0x1 -#define COL_MATCH_FROM_SLOT_ID 0x2 - -typedef struct SLoadRemoteDataInfo { - uint64_t totalSize; // total load bytes from remote - uint64_t totalRows; // total number of rows - uint64_t totalElapsed; // total elapsed time -} SLoadRemoteDataInfo; - -typedef struct SLimitInfo { - SLimit limit; - SLimit slimit; - uint64_t currentGroupId; - int64_t remainGroupOffset; - int64_t numOfOutputGroups; - int64_t remainOffset; - int64_t numOfOutputRows; -} SLimitInfo; - -typedef struct SExchangeInfo { - SArray* pSources; - SArray* pSourceDataInfo; - tsem_t ready; - void* pTransporter; - - // SArray, result block list, used to keep the multi-block that - // passed by downstream operator - SArray* pResultBlockList; - SArray* pRecycledBlocks; // build a pool for small data block to avoid to repeatly create and then destroy. - SSDataBlock* pDummyBlock; // dummy block, not keep data - bool seqLoadData; // sequential load data or not, false by default - int32_t current; - SLoadRemoteDataInfo loadInfo; - uint64_t self; - SLimitInfo limitInfo; - int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo -} SExchangeInfo; - -typedef struct SScanInfo { - int32_t numOfAsc; - int32_t numOfDesc; -} SScanInfo; - -typedef struct SSampleExecInfo { - double sampleRatio; // data block sample ratio, 1 by default - uint32_t seed; // random seed value -} SSampleExecInfo; - -enum { - TABLE_SCAN__TABLE_ORDER = 1, - TABLE_SCAN__BLOCK_ORDER = 2, -}; - -typedef struct SAggSupporter { - SSHashObj* pResultRowHashTable; // quick locate the window object for each result - char* keyBuf; // window key buffer - SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file - int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row - int32_t currentPageId; // current write page id -} SAggSupporter; - -typedef struct { - // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if - // current data block needs to be loaded. - SInterval interval; - SAggSupporter* pAggSup; - SExprSupp* pExprSup; // expr supporter of aggregate operator -} SAggOptrPushDownInfo; - -typedef struct STableMetaCacheInfo { - SLRUCache* pTableMetaEntryCache; // 100 by default - uint64_t metaFetch; - uint64_t cacheHit; -} STableMetaCacheInfo; - -typedef struct STableScanBase { - STsdbReader* dataReader; - SFileBlockLoadRecorder readRecorder; - SQueryTableDataCond cond; - SAggOptrPushDownInfo pdInfo; - SColMatchInfo matchInfo; - SReadHandle readHandle; - SExprSupp pseudoSup; - STableMetaCacheInfo metaCache; - int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan - int32_t dataBlockLoadFlag; - SLimitInfo limitInfo; - // there are more than one table list exists in one task, if only one vnode exists. - STableListInfo* pTableListInfo; -} STableScanBase; - -typedef struct STableScanInfo { - STableScanBase base; - SScanInfo scanInfo; - int32_t scanTimes; - SSDataBlock* pResBlock; - SSampleExecInfo sample; // sample execution info - int32_t currentGroupId; - int32_t currentTable; - int8_t scanMode; - int8_t assignBlockUid; - bool hasGroupByTag; - bool countOnly; -} STableScanInfo; - -typedef struct STableMergeScanInfo { - int32_t tableStartIndex; - int32_t tableEndIndex; - bool hasGroupId; - uint64_t groupId; - SArray* queryConds; // array of queryTableDataCond - STableScanBase base; - int32_t bufPageSize; - uint32_t sortBufSize; // max buffer size for in-memory sort - SArray* pSortInfo; - SSortHandle* pSortHandle; - SSDataBlock* pSortInputBlock; - int64_t startTs; // sort start time - SArray* sortSourceParams; - SLimitInfo limitInfo; - int64_t numOfRows; - SScanInfo scanInfo; - int32_t scanTimes; - SSDataBlock* pResBlock; - SSampleExecInfo sample; // sample execution info - SSortExecInfo sortExecInfo; -} STableMergeScanInfo; - -typedef struct STagScanInfo { - SColumnInfo* pCols; - SSDataBlock* pRes; - SColMatchInfo matchInfo; - int32_t curPos; - SReadHandle readHandle; - STableListInfo* pTableListInfo; -} STagScanInfo; - -typedef enum EStreamScanMode { - STREAM_SCAN_FROM_READERHANDLE = 1, - STREAM_SCAN_FROM_RES, - STREAM_SCAN_FROM_UPDATERES, - STREAM_SCAN_FROM_DELETE_DATA, - STREAM_SCAN_FROM_DATAREADER_RETRIEVE, - STREAM_SCAN_FROM_DATAREADER_RANGE, -} EStreamScanMode; - -enum { - PROJECT_RETRIEVE_CONTINUE = 0x1, - PROJECT_RETRIEVE_DONE = 0x2, -}; - -typedef struct SStreamAggSupporter { - int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row - SSDataBlock* pScanBlock; - SStreamState* pState; - int64_t gap; // stream session window gap - SqlFunctionCtx* pDummyCtx; // for combine - SSHashObj* pResultRows; - int32_t stateKeySize; - int16_t stateKeyType; - SDiskbasedBuf* pResultBuf; -} SStreamAggSupporter; - -typedef struct SWindowSupporter { - SStreamAggSupporter* pStreamAggSup; - int64_t gap; - uint16_t parentType; - SAggSupporter* pIntervalAggSup; -} SWindowSupporter; - -typedef struct SPartitionBySupporter { - SArray* pGroupCols; // group by columns, SArray - SArray* pGroupColVals; // current group column values, SArray - char* keyBuf; // group by keys for hash - bool needCalc; // partition by column -} SPartitionBySupporter; - -typedef struct SPartitionDataInfo { - uint64_t groupId; - char* tbname; - SArray* tags; - SArray* rowIds; -} SPartitionDataInfo; - -typedef struct STimeWindowAggSupp { - int8_t calTrigger; - int8_t calTriggerSaved; - int64_t deleteMark; - int64_t deleteMarkSaved; - int64_t waterMark; - TSKEY maxTs; - TSKEY minTs; - SColumnInfoData timeWindowData; // query time window info for scalar function execution. -} STimeWindowAggSupp; - -typedef struct SStreamScanInfo { - SExprInfo* pPseudoExpr; - int32_t numOfPseudoExpr; - SExprSupp tbnameCalSup; - SExprSupp tagCalSup; - int32_t primaryTsIndex; // primary time stamp slot id - SReadHandle readHandle; - SInterval interval; // if the upstream is an interval operator, the interval info is also kept here. - SColMatchInfo matchInfo; - - SArray* pBlockLists; // multiple SSDatablock. - SSDataBlock* pRes; // result SSDataBlock - SSDataBlock* pUpdateRes; // update SSDataBlock - int32_t updateResIndex; - int32_t blockType; // current block type - int32_t validBlockIndex; // Is current data has returned? - uint64_t numOfExec; // execution times - STqReader* tqReader; - - uint64_t groupId; - SUpdateInfo* pUpdateInfo; - - EStreamScanMode scanMode; - struct SOperatorInfo* pStreamScanOp; - struct SOperatorInfo* pTableScanOp; - SArray* childIds; - SWindowSupporter windowSup; - SPartitionBySupporter partitionSup; - SExprSupp* pPartScalarSup; - bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA. - int32_t scanWinIndex; // for state operator - int32_t pullDataResIndex; - SSDataBlock* pPullDataRes; // pull data SSDataBlock - SSDataBlock* pDeleteDataRes; // delete data SSDataBlock - int32_t deleteDataIndex; - STimeWindow updateWin; - STimeWindowAggSupp twAggSup; - SSDataBlock* pUpdateDataRes; - // status for tmq - SNodeList* pGroupTags; - SNode* pTagCond; - SNode* pTagIndexCond; - - // recover - int32_t blockRecoverContiCnt; - int32_t blockRecoverTotCnt; - SSDataBlock* pRecoverRes; - - SSDataBlock* pCreateTbRes; - int8_t igCheckUpdate; - int8_t igExpired; -} SStreamScanInfo; - -typedef struct { - SVnode* vnode; - SSDataBlock pRes; // result SSDataBlock - STsdbReader* dataReader; - SSnapContext* sContext; - STableListInfo* pTableListInfo; -} SStreamRawScanInfo; - -typedef struct STableCountScanSupp { - int16_t dbNameSlotId; - int16_t stbNameSlotId; - int16_t tbCountSlotId; - bool groupByDbName; - bool groupByStbName; - char dbNameFilter[TSDB_DB_NAME_LEN]; - char stbNameFilter[TSDB_TABLE_NAME_LEN]; -} STableCountScanSupp; - -typedef struct SOptrBasicInfo { - SResultRowInfo resultRowInfo; - SSDataBlock* pRes; - bool mergeResultBlock; -} SOptrBasicInfo; - -typedef struct SIntervalAggOperatorInfo { - SOptrBasicInfo binfo; // basic info - SAggSupporter aggSup; // aggregate supporter - SExprSupp scalarSupp; // supporter for perform scalar function - SGroupResInfo groupResInfo; // multiple results build supporter - SInterval interval; // interval info - int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. - STimeWindow win; // query time range - bool timeWindowInterpo; // interpolation needed or not - SArray* pInterpCols; // interpolation columns - int32_t resultTsOrder; // result timestamp order - int32_t inputOrder; // input data ts order - EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] - STimeWindowAggSupp twAggSup; - SArray* pPrevValues; // SArray used to keep the previous not null value for interpolation. -} SIntervalAggOperatorInfo; - -typedef struct SMergeAlignedIntervalAggOperatorInfo { - SIntervalAggOperatorInfo* intervalAggOperatorInfo; - - uint64_t groupId; // current groupId - int64_t curTs; // current ts - SSDataBlock* prefetchedBlock; - SResultRow* pResultRow; -} SMergeAlignedIntervalAggOperatorInfo; - -typedef struct SStreamIntervalOperatorInfo { - SOptrBasicInfo binfo; // basic info - SAggSupporter aggSup; // aggregate supporter - SExprSupp scalarSupp; // supporter for perform scalar function - SGroupResInfo groupResInfo; // multiple results build supporter - SInterval interval; // interval info - int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. - STimeWindowAggSupp twAggSup; - bool invertible; - bool ignoreExpiredData; - bool ignoreExpiredDataSaved; - SArray* pDelWins; // SWinRes - int32_t delIndex; - SSDataBlock* pDelRes; - SPhysiNode* pPhyNode; // create new child - SHashObj* pPullDataMap; - SArray* pPullWins; // SPullWindowInfo - int32_t pullIndex; - SSDataBlock* pPullDataRes; - bool isFinal; - SArray* pChildren; - SStreamState* pState; - SWinKey delKey; - uint64_t numOfDatapack; - SArray* pUpdated; - SSHashObj* pUpdatedMap; - int64_t dataVersion; -} SStreamIntervalOperatorInfo; - -typedef struct SDataGroupInfo { - uint64_t groupId; - int64_t numOfRows; - SArray* pPageList; -} SDataGroupInfo; - -typedef struct SWindowRowsSup { - STimeWindow win; - TSKEY prevTs; - int32_t startRowIndex; - int32_t numOfRows; - uint64_t groupId; -} SWindowRowsSup; - -typedef struct SResultWindowInfo { - void* pOutputBuf; - SSessionKey sessionWin; - bool isOutput; -} SResultWindowInfo; - -typedef struct SStateWindowInfo { - SResultWindowInfo winInfo; - SStateKeys* pStateKey; -} SStateWindowInfo; - -typedef struct SStreamSessionAggOperatorInfo { - SOptrBasicInfo binfo; - SStreamAggSupporter streamAggSup; - SExprSupp scalarSupp; // supporter for perform scalar function - SGroupResInfo groupResInfo; - int32_t primaryTsIndex; // primary timestamp slot id - int32_t endTsIndex; // window end timestamp slot id - int32_t order; // current SSDataBlock scan order - STimeWindowAggSupp twAggSup; - SSDataBlock* pWinBlock; // window result - SSDataBlock* pDelRes; // delete result - SSDataBlock* pUpdateRes; // update window - bool returnUpdate; - SSHashObj* pStDeleted; - void* pDelIterator; - SArray* pChildren; // cache for children's result; final stream operator - SPhysiNode* pPhyNode; // create new child - bool isFinal; - bool ignoreExpiredData; - bool ignoreExpiredDataSaved; - SArray* pUpdated; - SSHashObj* pStUpdated; - int64_t dataVersion; -} SStreamSessionAggOperatorInfo; - -typedef struct SStreamStateAggOperatorInfo { - SOptrBasicInfo binfo; - SStreamAggSupporter streamAggSup; - SExprSupp scalarSupp; // supporter for perform scalar function - SGroupResInfo groupResInfo; - int32_t primaryTsIndex; // primary timestamp slot id - STimeWindowAggSupp twAggSup; - SColumn stateCol; - SSDataBlock* pDelRes; - SSHashObj* pSeDeleted; - void* pDelIterator; - SArray* pChildren; // cache for children's result; - bool ignoreExpiredData; - bool ignoreExpiredDataSaved; - SArray* pUpdated; - SSHashObj* pSeUpdated; - int64_t dataVersion; -} SStreamStateAggOperatorInfo; - -typedef struct SStreamPartitionOperatorInfo { - SOptrBasicInfo binfo; - SPartitionBySupporter partitionSup; - SExprSupp scalarSup; - SExprSupp tbnameCalSup; - SExprSupp tagCalSup; - SHashObj* pPartitions; - void* parIte; - void* pTbNameIte; - SSDataBlock* pInputDataBlock; - int32_t tsColIndex; - SSDataBlock* pDelRes; - SSDataBlock* pCreateTbRes; -} SStreamPartitionOperatorInfo; - -typedef struct SStreamFillSupporter { - int32_t type; // fill type - SInterval interval; - SResultRowData prev; - SResultRowData cur; - SResultRowData next; - SResultRowData nextNext; - SFillColInfo* pAllColInfo; // fill exprs and not fill exprs - SExprSupp notFillExprSup; - int32_t numOfAllCols; // number of all exprs, including the tags columns - int32_t numOfFillCols; - int32_t numOfNotFillCols; - int32_t rowSize; - SSHashObj* pResMap; - bool hasDelete; -} SStreamFillSupporter; - -typedef struct SStreamFillOperatorInfo { - SStreamFillSupporter* pFillSup; - SSDataBlock* pRes; - SSDataBlock* pSrcBlock; - int32_t srcRowIndex; - SSDataBlock* pSrcDelBlock; - int32_t srcDelRowIndex; - SSDataBlock* pDelRes; - SColMatchInfo matchInfo; - int32_t primaryTsCol; - int32_t primarySrcSlotId; - SStreamFillInfo* pFillInfo; -} SStreamFillOperatorInfo; - -#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) -#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) - -SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode); -int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, const char* dbName, SExecTaskInfo* pTaskInfo); - -void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock); -void cleanupBasicInfo(SOptrBasicInfo* pInfo); - -int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr); -void cleanupExprSupp(SExprSupp* pSup); - -void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs); - -int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, - const char* pkey, void* pState); -void cleanupAggSup(SAggSupporter* pAggSup); - -void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows); - -void doBuildStreamResBlock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, - SDiskbasedBuf* pBuf); -void doBuildResultDatablock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, - SDiskbasedBuf* pBuf); - -bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); -bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo); -void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo); -void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo); -bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); - -void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, - int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput); - -int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart); -void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs, - struct SOperatorInfo* pOperator); - -STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order); - -int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); - -extern void doDestroyExchangeOperatorInfo(void* param); - -void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo); -int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock, - int32_t rows, const char* idStr, STableMetaCacheInfo* pCache); - -void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); -void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name); - -void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset); -void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput); - -SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData, - int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo, - bool isIntervalQuery, SAggSupporter* pSup, bool keepGroup); - -int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, - int32_t numOfOutput, SArray* pPseudoList); - -void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol); - -int32_t checkForQueryBuf(size_t numOfTables); - -SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo); - -int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* pTask, SReadHandle* readHandle); -int32_t getOperatorExplainExecInfo(struct SOperatorInfo* operatorInfo, SArray* pExecInfoList); - -STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, - int32_t order); -int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey, - __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order); -int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); -SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize); -void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, SSessionKey* pKey); -bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap); -bool functionNeedToExecute(SqlFunctionCtx* pCtx); -bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup); -bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup); -bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup); -void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, - uint64_t* pGp, void* pTbName); -uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId); - -int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup, - SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); - -bool groupbyTbname(SNodeList* pGroupList); -int32_t buildDataBlockFromGroupRes(struct SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, - SGroupResInfo* pGroupResInfo); -int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size); -int32_t buildSessionResultDataBlock(struct SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, - SExprSupp* pSup, SGroupResInfo* pGroupResInfo); -int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, - SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup); -int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult); -int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize); -void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order); -int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order, - int64_t* pData); -void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId, - SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock); - -SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag); -SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs); - -void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx, - SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo); -void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset); -void doClearBufferedBlocks(SStreamScanInfo* pInfo); - -#ifdef __cplusplus -} -#endif - -#endif // TDENGINE_EXECUTORIMPL_H diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 92018b6e3e..632b817a07 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -157,6 +157,7 @@ void destroyOperator(SOperatorInfo* pOperator); SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id); int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder); int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdStr); +int32_t getOperatorExplainExecInfo(struct SOperatorInfo* operatorInfo, SArray* pExecInfoList); #ifdef __cplusplus } diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index aa42cd2c14..8852265da0 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -22,6 +22,17 @@ extern "C" { #define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str) +enum { + // when this task starts to execute, this status will set + TASK_NOT_COMPLETED = 0x1u, + + /* Task is over + * 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc. + * 2. when all data within queried time window, it is also denoted as query_completed + */ + TASK_COMPLETED = 0x2u, +}; + typedef struct STaskIdInfo { uint64_t queryId; // this is also a request id uint64_t subplanId; @@ -44,6 +55,23 @@ typedef struct STaskStopInfo { SArray* pStopInfo; } STaskStopInfo; +typedef struct { + STqOffsetVal currentOffset; // for tmq + SMqMetaRsp metaRsp; // for tmq fetching meta + int64_t snapshotVer; + SPackedData submit; // todo remove it + SSchemaWrapper* schema; + char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor + int8_t recoverStep; + int8_t recoverScanFinished; + SQueryTableDataCond tableCond; + int64_t fillHistoryVer1; + int64_t fillHistoryVer2; + SStreamState* pState; + int64_t dataVersion; + int64_t checkPointId; +} SStreamTaskInfo; + struct SExecTaskInfo { STaskIdInfo id; uint32_t status; @@ -75,6 +103,7 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, int32_t vgId, char* sql, EOPTR_EXEC_MODEL model); int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo); +SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo); #ifdef __cplusplus } diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index c00c167d8e..9b463a3dee 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -20,16 +20,16 @@ #include "tfill.h" #include "tname.h" -#include "tdatablock.h" -#include "tglobal.h" -#include "executorimpl.h" +#include "executorInt.h" #include "index.h" +#include "operator.h" #include "query.h" +#include "querytask.h" #include "tcompare.h" +#include "tdatablock.h" +#include "tglobal.h" #include "thash.h" #include "ttypes.h" -#include "operator.h" -#include "querytask.h" typedef struct { bool hasAgg; diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 6fee0899ce..eec34a6406 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -20,12 +20,12 @@ #include "tdatablock.h" #include "tmsg.h" -#include "executorimpl.h" +#include "executorInt.h" +#include "operator.h" +#include "querytask.h" #include "tcompare.h" #include "thash.h" #include "ttypes.h" -#include "operator.h" -#include "querytask.h" typedef struct SCacheRowsScanInfo { SSDataBlock* pRes; diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index 96d061fc04..11074b0e94 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -15,7 +15,7 @@ #include "dataSinkInt.h" #include "dataSinkMgt.h" -#include "executorimpl.h" +#include "executorInt.h" #include "planner.h" #include "tcompression.h" #include "tdatablock.h" diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 8e32559fac..ce8dc898a5 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -15,7 +15,7 @@ #include "dataSinkInt.h" #include "dataSinkMgt.h" -#include "executorimpl.h" +#include "executorInt.h" #include "planner.h" #include "tcompression.h" #include "tdatablock.h" diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 22f388d406..33eccf4759 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -15,7 +15,7 @@ #include "dataSinkInt.h" #include "dataSinkMgt.h" -#include "executorimpl.h" +#include "executorInt.h" #include "planner.h" #include "tcompression.h" #include "tdatablock.h" diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 8ec7ed91a3..956d5b714d 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -13,16 +13,16 @@ * along with this program. If not, see . */ -#include "executorimpl.h" +#include "executorInt.h" #include "filter.h" #include "function.h" #include "functionMgt.h" +#include "operator.h" +#include "querytask.h" #include "tcommon.h" #include "tcompare.h" #include "tdatablock.h" #include "ttime.h" -#include "operator.h" -#include "querytask.h" typedef struct SEventWindowOperatorInfo { SOptrBasicInfo binfo; diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 31445dd568..94041140d4 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -13,19 +13,19 @@ * along with this program. If not, see . */ +#include "executorInt.h" #include "filter.h" #include "function.h" -#include "os.h" -#include "tname.h" -#include "tref.h" -#include "tdatablock.h" -#include "tmsg.h" -#include "executorimpl.h" #include "index.h" -#include "query.h" -#include "thash.h" #include "operator.h" +#include "os.h" +#include "query.h" #include "querytask.h" +#include "tdatablock.h" +#include "thash.h" +#include "tmsg.h" +#include "tname.h" +#include "tref.h" typedef struct SFetchRspHandleWrapper { uint32_t exchangeId; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 75e1d374e4..c51dc39b5b 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -24,9 +24,9 @@ #include "ttime.h" #include "executil.h" -#include "executorimpl.h" -#include "tcompression.h" +#include "executorInt.h" #include "querytask.h" +#include "tcompression.h" typedef struct STableListIdInfo { uint64_t suid; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 2b9ab6c09a..7a5715e5ed 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -14,14 +14,14 @@ */ #include "executor.h" -#include "executorimpl.h" +#include "executorInt.h" +#include "operator.h" #include "planner.h" +#include "querytask.h" #include "tdatablock.h" #include "tref.h" #include "tudf.h" #include "vnode.h" -#include "operator.h" -#include "querytask.h" static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT; int32_t exchangeObjRefPool = -1; @@ -718,8 +718,6 @@ void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) { taosArrayRemove(pTaskInfo->stopInfo.pStopInfo, idx); } taosWUnLockLatch(&pTaskInfo->stopInfo.lock); - - return; } void qStopTaskOperators(SExecTaskInfo* pTaskInfo) { @@ -1304,3 +1302,25 @@ SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) { taosArrayDestroy(plist); return pUidList; } + +static void extractTableList(SArray* pList, const SOperatorInfo* pOperator) { + if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + SStreamScanInfo* pScanInfo = pOperator->info; + STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info; + taosArrayPush(pList, &pTableScanInfo->base.pTableListInfo); + } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { + STableScanInfo* pScanInfo = pOperator->info; + taosArrayPush(pList, &pScanInfo->base.pTableListInfo); + } else { + if (pOperator->pDownstream != NULL && pOperator->pDownstream[0] != NULL) { + extractTableList(pList, pOperator->pDownstream[0]); + } + } +} + +SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo) { + SArray* pArray = taosArrayInit(0, POINTER_BYTES); + SOperatorInfo* pOperator = pTaskInfo->pRoot; + extractTableList(pArray, pOperator); + return pArray; +} \ No newline at end of file diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorInt.c similarity index 95% rename from source/libs/executor/src/executorimpl.c rename to source/libs/executor/src/executorInt.c index 3ab929cad0..f525f6728c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorInt.c @@ -25,15 +25,15 @@ #include "tmsg.h" #include "ttime.h" -#include "executorimpl.h" +#include "executorInt.h" #include "index.h" +#include "operator.h" #include "query.h" +#include "querytask.h" #include "tcompare.h" #include "thash.h" #include "ttypes.h" #include "vnode.h" -#include "operator.h" -#include "querytask.h" #define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN) #define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP) @@ -1059,37 +1059,6 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* return TSDB_CODE_SUCCESS; } -int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) { - SExplainExecInfo execInfo = {0}; - SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo); - - pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows; - pExplainInfo->startupCost = operatorInfo->cost.openCost; - pExplainInfo->totalCost = operatorInfo->cost.totalCost; - pExplainInfo->verboseLen = 0; - pExplainInfo->verboseInfo = NULL; - - if (operatorInfo->fpSet.getExplainFn) { - int32_t code = - operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen); - if (code) { - qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code)); - return code; - } - } - - int32_t code = 0; - for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) { - code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList); - if (code != TSDB_CODE_SUCCESS) { - // taosMemoryFreeClear(*pRes); - return TSDB_CODE_OUT_OF_MEMORY; - } - } - - return TSDB_CODE_SUCCESS; -} - int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) { SWinKey key = { @@ -1331,25 +1300,3 @@ void qStreamCloseTsdbReader(void* task) { } } } - -static void extractTableList(SArray* pList, const SOperatorInfo* pOperator) { - if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - SStreamScanInfo* pScanInfo = pOperator->info; - STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info; - taosArrayPush(pList, &pTableScanInfo->base.pTableListInfo); - } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { - STableScanInfo* pScanInfo = pOperator->info; - taosArrayPush(pList, &pScanInfo->base.pTableListInfo); - } else { - if (pOperator->pDownstream != NULL && pOperator->pDownstream[0] != NULL) { - extractTableList(pList, pOperator->pDownstream[0]); - } - } -} - -SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo) { - SArray* pArray = taosArrayInit(0, POINTER_BYTES); - SOperatorInfo* pOperator = pTaskInfo->pRoot; - extractTableList(pArray, pOperator); - return pArray; -} \ No newline at end of file diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index e465e6a525..0ac9e6097f 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -20,7 +20,7 @@ #include "tmsg.h" #include "ttypes.h" -#include "executorimpl.h" +#include "executorInt.h" #include "tcommon.h" #include "thash.h" #include "ttime.h" diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 821db4244e..47338d4469 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -22,12 +22,11 @@ #include "tmsg.h" #include "executorInt.h" -#include "executorimpl.h" +#include "operator.h" +#include "querytask.h" #include "tcompare.h" #include "thash.h" #include "ttypes.h" -#include "operator.h" -#include "querytask.h" typedef struct SGroupbyOperatorInfo { SOptrBasicInfo binfo; diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index b4862a490f..754b5f4737 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -13,18 +13,18 @@ * along with this program. If not, see . */ +#include "executorInt.h" #include "filter.h" -#include "executorimpl.h" #include "function.h" +#include "operator.h" #include "os.h" #include "querynodes.h" +#include "querytask.h" #include "tcompare.h" #include "tdatablock.h" #include "thash.h" #include "tmsg.h" #include "ttypes.h" -#include "operator.h" -#include "querytask.h" typedef struct SJoinRowCtx { bool rowRemains; diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 5e384ca6dd..729178dc60 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -20,12 +20,12 @@ #include "tglobal.h" -#include "executorimpl.h" +#include "executorInt.h" #include "index.h" -#include "query.h" -#include "vnode.h" #include "operator.h" +#include "query.h" #include "querytask.h" +#include "vnode.h" SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, @@ -75,28 +75,6 @@ void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, b pOperator->pTaskInfo = pTaskInfo; } -void destroyOperator(SOperatorInfo* pOperator) { - if (pOperator == NULL) { - return; - } - - if (pOperator->fpSet.closeFn != NULL) { - pOperator->fpSet.closeFn(pOperator->info); - } - - if (pOperator->pDownstream != NULL) { - for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { - destroyOperator(pOperator->pDownstream[i]); - } - - taosMemoryFreeClear(pOperator->pDownstream); - pOperator->numOfDownstream = 0; - } - - cleanupExprSupp(&pOperator->exprSupp); - taosMemoryFreeClear(pOperator); -} - // each operator should be set their own function to return total cost buffer int32_t optrDefaultBufFn(SOperatorInfo* pOperator) { if (pOperator->blocking) { @@ -106,40 +84,6 @@ int32_t optrDefaultBufFn(SOperatorInfo* pOperator) { } } -//int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder) { -// // todo add more information about exchange operation -// int32_t type = pOperator->operatorType; -// if (type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || -// type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || -// type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) { -// *order = TSDB_ORDER_ASC; -// *scanFlag = MAIN_SCAN; -// return TSDB_CODE_SUCCESS; -// } else if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) { -// if (!inheritUsOrder) { -// *order = TSDB_ORDER_ASC; -// } -// *scanFlag = MAIN_SCAN; -// return TSDB_CODE_SUCCESS; -// } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { -// STableScanInfo* pTableScanInfo = pOperator->info; -// *order = pTableScanInfo->base.cond.order; -// *scanFlag = pTableScanInfo->base.scanFlag; -// return TSDB_CODE_SUCCESS; -// } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) { -// STableMergeScanInfo* pTableScanInfo = pOperator->info; -// *order = pTableScanInfo->base.cond.order; -// *scanFlag = pTableScanInfo->base.scanFlag; -// return TSDB_CODE_SUCCESS; -// } else { -// if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) { -// return TSDB_CODE_INVALID_PARA; -// } else { -// return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag, inheritUsOrder); -// } -// } -//} - static int64_t getQuerySupportBufSize(size_t numOfTables) { size_t s1 = sizeof(STableQueryInfo); // size_t s3 = sizeof(STableCheckInfo); buffer consumption in tsdb @@ -319,7 +263,7 @@ int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdStr) { } SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond, - SNode* pTagIndexCond, const char* pUser, const char* dbname) { + SNode* pTagIndexCond, const char* pUser, const char* dbname) { int32_t type = nodeType(pPhyNode); const char* idstr = GET_TASKID(pTaskInfo); @@ -347,7 +291,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo); if (code) { - pTaskInfo->code = terrno; + pTaskInfo->code = code; tableListDestroy(pTableListInfo); return NULL; } @@ -355,6 +299,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo); if (NULL == pOperator) { pTaskInfo->code = terrno; + tableListDestroy(pTableListInfo); return NULL; } @@ -578,3 +523,56 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR return pOptr; } + +void destroyOperator(SOperatorInfo* pOperator) { + if (pOperator == NULL) { + return; + } + + if (pOperator->fpSet.closeFn != NULL) { + pOperator->fpSet.closeFn(pOperator->info); + } + + if (pOperator->pDownstream != NULL) { + for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { + destroyOperator(pOperator->pDownstream[i]); + } + + taosMemoryFreeClear(pOperator->pDownstream); + pOperator->numOfDownstream = 0; + } + + cleanupExprSupp(&pOperator->exprSupp); + taosMemoryFreeClear(pOperator); +} + +int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) { + SExplainExecInfo execInfo = {0}; + SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo); + + pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows; + pExplainInfo->startupCost = operatorInfo->cost.openCost; + pExplainInfo->totalCost = operatorInfo->cost.totalCost; + pExplainInfo->verboseLen = 0; + pExplainInfo->verboseInfo = NULL; + + if (operatorInfo->fpSet.getExplainFn) { + int32_t code = + operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen); + if (code) { + qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code)); + return code; + } + } + + int32_t code = 0; + for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) { + code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList); + if (code != TSDB_CODE_SUCCESS) { + // taosMemoryFreeClear(*pRes); + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + return TSDB_CODE_SUCCESS; +} diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 01ee18b41f..02f504bef0 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "executorimpl.h" +#include "executorInt.h" #include "filter.h" #include "functionMgt.h" #include "operator.h" diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index ed209efed0..b6b250a325 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -24,14 +24,14 @@ #include "tdatablock.h" #include "tmsg.h" -#include "executorimpl.h" +#include "executorInt.h" #include "index.h" +#include "operator.h" #include "query.h" +#include "querytask.h" #include "thash.h" #include "ttypes.h" #include "vnode.h" -#include "operator.h" -#include "querytask.h" #define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a717c8b7c0..130cca9cbb 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "executorimpl.h" +#include "executorInt.h" #include "filter.h" #include "function.h" #include "functionMgt.h" diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 0b23a803c6..10933f285c 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -13,11 +13,11 @@ * along with this program. If not, see . */ +#include "executorInt.h" #include "filter.h" -#include "executorimpl.h" -#include "tdatablock.h" #include "operator.h" #include "querytask.h" +#include "tdatablock.h" typedef struct SSortOperatorInfo { SOptrBasicInfo binfo; diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index bc850b5333..c75c49fe77 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "executorimpl.h" +#include "executorInt.h" #include "filter.h" #include "function.h" #include "functionMgt.h" diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 8376caace0..fc4e82b57f 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -20,7 +20,7 @@ #include "tmsg.h" #include "ttypes.h" -#include "executorimpl.h" +#include "executorInt.h" #include "tcommon.h" #include "thash.h" #include "ttime.h" diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index a688920a22..29e3668ec4 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -12,17 +12,17 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include "executorimpl.h" +#include "executorInt.h" #include "filter.h" #include "function.h" #include "functionMgt.h" +#include "operator.h" +#include "querytask.h" #include "tcommon.h" #include "tcompare.h" #include "tdatablock.h" #include "tfill.h" #include "ttime.h" -#include "operator.h" -#include "querytask.h" typedef struct STimeSliceOperatorInfo { SSDataBlock* pRes; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index ab9b0987fa..bea01fa0d8 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -12,21 +12,27 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include "executorimpl.h" +#include "executorInt.h" #include "filter.h" #include "function.h" #include "functionMgt.h" +#include "operator.h" +#include "querytask.h" #include "tcommon.h" #include "tcompare.h" #include "tdatablock.h" #include "tfill.h" #include "ttime.h" -#include "operator.h" -#include "querytask.h" #define IS_FINAL_OP(op) ((op)->isFinal) #define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); +typedef struct SStateWindowInfo { + SResultWindowInfo winInfo; + SStateKeys* pStateKey; +} SStateWindowInfo; + + typedef struct SSessionAggOperatorInfo { SOptrBasicInfo binfo; SAggSupporter aggSup; diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index db0678f214..cefe12990d 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -24,13 +24,13 @@ #include "os.h" #include "executor.h" -#include "executorimpl.h" +#include "executorInt.h" #include "function.h" +#include "operator.h" #include "taos.h" #include "tdatablock.h" #include "tdef.h" #include "tvariant.h" -#include "operator.h" namespace { diff --git a/source/libs/executor/test/lhashTests.cpp b/source/libs/executor/test/lhashTests.cpp index 24570ff788..92f7652d8d 100644 --- a/source/libs/executor/test/lhashTests.cpp +++ b/source/libs/executor/test/lhashTests.cpp @@ -15,7 +15,7 @@ #include #include -#include "executorimpl.h" +#include "executorInt.h" #include "tlinearhash.h" #pragma GCC diagnostic push diff --git a/source/libs/executor/test/sortTests.cpp b/source/libs/executor/test/sortTests.cpp index f35d07804e..8122d7d6a9 100644 --- a/source/libs/executor/test/sortTests.cpp +++ b/source/libs/executor/test/sortTests.cpp @@ -26,7 +26,7 @@ #include "os.h" #include "executor.h" -#include "executorimpl.h" +#include "executorInt.h" #include "taos.h" #include "tcompare.h" #include "tdatablock.h" -- GitLab