diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c
index c164d037e0bc6e124aa5edfbe6a3570502afbfcf..8cda5a21c9263e34afd8268490bcb1728a03fbda 100644
--- a/source/dnode/vnode/src/tq/tqRestore.c
+++ b/source/dnode/vnode/src/tq/tqRestore.c
@@ -107,7 +107,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__RECOVER_PREPARE ||
status == TASK_STATUS__WAIT_DOWNSTREAM) {
- tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, status);
+ tqDebug("s-task:%s skip push data, not ready for processing, status:%d", pTask->id.idStr, status);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c
index 0f00a5acb86d05b30ab753f1a7ba7517e04325fa..8e243a8bd100d1ba11582f58093c3dc955a5f260 100644
--- a/source/dnode/vnode/src/tq/tqScan.c
+++ b/source/dnode/vnode/src/tq/tqScan.c
@@ -130,6 +130,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, terrstr());
return -1;
}
+
tqDebug("tmqsnap task execute end, get %p", pDataBlock);
if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h
index d22a7460bb17f53e77c56ec33c177ab466919b47..a4f1e2ef9418115c24a10429092216acf702b156 100644
--- a/source/libs/executor/inc/executorInt.h
+++ b/source/libs/executor/inc/executorInt.h
@@ -12,14 +12,75 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
-
-#ifndef _TD_EXECUTOR_INT_H
-#define _TD_EXECUTOR_INT_H
+#ifndef TDENGINE_EXECUTORINT_H
+#define TDENGINE_EXECUTORINT_H
#ifdef __cplusplus
extern "C" {
#endif
+#include "os.h"
+#include "tcommon.h"
+#include "tlosertree.h"
+#include "tsort.h"
+#include "ttszip.h"
+#include "tvariant.h"
+
+#include "dataSinkMgt.h"
+#include "executil.h"
+#include "executor.h"
+#include "planner.h"
+#include "scalar.h"
+#include "taosdef.h"
+#include "tarray.h"
+#include "tfill.h"
+#include "thash.h"
+#include "tlockfree.h"
+#include "tmsg.h"
+#include "tpagedbuf.h"
+#include "tstream.h"
+#include "tstreamUpdate.h"
+
+#include "vnode.h"
+
+typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
+
+#define IS_VALID_SESSION_WIN(winInfo) ((winInfo).sessionWin.win.skey > 0)
+#define SET_SESSION_WIN_INVALID(winInfo) ((winInfo).sessionWin.win.skey = INT64_MIN)
+#define IS_INVALID_SESSION_WIN_KEY(winKey) ((winKey).win.skey <= 0)
+#define SET_SESSION_WIN_KEY_INVALID(pWinKey) ((pWinKey)->win.skey = INT64_MIN)
+
+/**
+ * If the number of generated results is greater than this value,
+ * query query will be halt and return results to client immediate.
+ */
+typedef struct SResultInfo { // TODO refactor
+ int64_t totalRows; // total generated result size in rows
+ int64_t totalBytes; // total results in bytes.
+ int32_t capacity; // capacity of current result output buffer
+ int32_t threshold; // result size threshold in rows.
+} SResultInfo;
+
+typedef struct STableQueryInfo {
+ TSKEY lastKey; // last check ts, todo remove it later
+ SResultRowPosition pos; // current active time window
+} STableQueryInfo;
+
+typedef struct SLimit {
+ int64_t limit;
+ int64_t offset;
+} SLimit;
+
+typedef struct STableScanAnalyzeInfo SFileBlockLoadRecorder;
+
+enum {
+ STREAM_RECOVER_STEP__NONE = 0,
+ STREAM_RECOVER_STEP__PREPARE1,
+ STREAM_RECOVER_STEP__PREPARE2,
+ STREAM_RECOVER_STEP__SCAN1,
+ STREAM_RECOVER_STEP__SCAN2,
+};
+
extern int32_t exchangeObjRefPool;
typedef struct {
@@ -29,9 +90,584 @@ typedef struct {
int32_t bytes;
} SGroupKeys, SStateKeys;
+typedef struct {
+ char* tablename;
+ char* dbname;
+ int32_t tversion;
+ SSchemaWrapper* sw;
+ SSchemaWrapper* qsw;
+} SSchemaInfo;
+
+typedef struct SExchangeOpStopInfo {
+ int32_t operatorType;
+ int64_t refId;
+} SExchangeOpStopInfo;
+
+typedef struct SExprSupp {
+ SExprInfo* pExprInfo;
+ int32_t numOfExprs; // the number of scalar expression in group operator
+ SqlFunctionCtx* pCtx;
+ int32_t* rowEntryInfoOffset; // offset value for each row result cell info
+ SFilterInfo* pFilterInfo;
+} SExprSupp;
+
+typedef enum {
+ EX_SOURCE_DATA_NOT_READY = 0x1,
+ EX_SOURCE_DATA_READY = 0x2,
+ EX_SOURCE_DATA_EXHAUSTED = 0x3,
+} EX_SOURCE_STATUS;
+
+#define COL_MATCH_FROM_COL_ID 0x1
+#define COL_MATCH_FROM_SLOT_ID 0x2
+
+typedef struct SLoadRemoteDataInfo {
+ uint64_t totalSize; // total load bytes from remote
+ uint64_t totalRows; // total number of rows
+ uint64_t totalElapsed; // total elapsed time
+} SLoadRemoteDataInfo;
+
+typedef struct SLimitInfo {
+ SLimit limit;
+ SLimit slimit;
+ uint64_t currentGroupId;
+ int64_t remainGroupOffset;
+ int64_t numOfOutputGroups;
+ int64_t remainOffset;
+ int64_t numOfOutputRows;
+} SLimitInfo;
+
+typedef struct SExchangeInfo {
+ SArray* pSources;
+ SArray* pSourceDataInfo;
+ tsem_t ready;
+ void* pTransporter;
+
+ // SArray, result block list, used to keep the multi-block that
+ // passed by downstream operator
+ SArray* pResultBlockList;
+ SArray* pRecycledBlocks; // build a pool for small data block to avoid to repeatly create and then destroy.
+ SSDataBlock* pDummyBlock; // dummy block, not keep data
+ bool seqLoadData; // sequential load data or not, false by default
+ int32_t current;
+ SLoadRemoteDataInfo loadInfo;
+ uint64_t self;
+ SLimitInfo limitInfo;
+ int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo
+} SExchangeInfo;
+
+typedef struct SScanInfo {
+ int32_t numOfAsc;
+ int32_t numOfDesc;
+} SScanInfo;
+
+typedef struct SSampleExecInfo {
+ double sampleRatio; // data block sample ratio, 1 by default
+ uint32_t seed; // random seed value
+} SSampleExecInfo;
+
+enum {
+ TABLE_SCAN__TABLE_ORDER = 1,
+ TABLE_SCAN__BLOCK_ORDER = 2,
+};
+
+typedef struct SAggSupporter {
+ SSHashObj* pResultRowHashTable; // quick locate the window object for each result
+ char* keyBuf; // window key buffer
+ SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
+ int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
+ int32_t currentPageId; // current write page id
+} SAggSupporter;
+
+typedef struct {
+ // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if
+ // current data block needs to be loaded.
+ SInterval interval;
+ SAggSupporter* pAggSup;
+ SExprSupp* pExprSup; // expr supporter of aggregate operator
+} SAggOptrPushDownInfo;
+
+typedef struct STableMetaCacheInfo {
+ SLRUCache* pTableMetaEntryCache; // 100 by default
+ uint64_t metaFetch;
+ uint64_t cacheHit;
+} STableMetaCacheInfo;
+
+typedef struct STableScanBase {
+ STsdbReader* dataReader;
+ SFileBlockLoadRecorder readRecorder;
+ SQueryTableDataCond cond;
+ SAggOptrPushDownInfo pdInfo;
+ SColMatchInfo matchInfo;
+ SReadHandle readHandle;
+ SExprSupp pseudoSup;
+ STableMetaCacheInfo metaCache;
+ int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
+ int32_t dataBlockLoadFlag;
+ SLimitInfo limitInfo;
+ // there are more than one table list exists in one task, if only one vnode exists.
+ STableListInfo* pTableListInfo;
+} STableScanBase;
+
+typedef struct STableScanInfo {
+ STableScanBase base;
+ SScanInfo scanInfo;
+ int32_t scanTimes;
+ SSDataBlock* pResBlock;
+ SSampleExecInfo sample; // sample execution info
+ int32_t currentGroupId;
+ int32_t currentTable;
+ int8_t scanMode;
+ int8_t assignBlockUid;
+ bool hasGroupByTag;
+ bool countOnly;
+} STableScanInfo;
+
+typedef struct STableMergeScanInfo {
+ int32_t tableStartIndex;
+ int32_t tableEndIndex;
+ bool hasGroupId;
+ uint64_t groupId;
+ SArray* queryConds; // array of queryTableDataCond
+ STableScanBase base;
+ int32_t bufPageSize;
+ uint32_t sortBufSize; // max buffer size for in-memory sort
+ SArray* pSortInfo;
+ SSortHandle* pSortHandle;
+ SSDataBlock* pSortInputBlock;
+ int64_t startTs; // sort start time
+ SArray* sortSourceParams;
+ SLimitInfo limitInfo;
+ int64_t numOfRows;
+ SScanInfo scanInfo;
+ int32_t scanTimes;
+ SSDataBlock* pResBlock;
+ SSampleExecInfo sample; // sample execution info
+ SSortExecInfo sortExecInfo;
+} STableMergeScanInfo;
+
+typedef struct STagScanInfo {
+ SColumnInfo* pCols;
+ SSDataBlock* pRes;
+ SColMatchInfo matchInfo;
+ int32_t curPos;
+ SReadHandle readHandle;
+ STableListInfo* pTableListInfo;
+} STagScanInfo;
+
+typedef enum EStreamScanMode {
+ STREAM_SCAN_FROM_READERHANDLE = 1,
+ STREAM_SCAN_FROM_RES,
+ STREAM_SCAN_FROM_UPDATERES,
+ STREAM_SCAN_FROM_DELETE_DATA,
+ STREAM_SCAN_FROM_DATAREADER_RETRIEVE,
+ STREAM_SCAN_FROM_DATAREADER_RANGE,
+} EStreamScanMode;
+
+enum {
+ PROJECT_RETRIEVE_CONTINUE = 0x1,
+ PROJECT_RETRIEVE_DONE = 0x2,
+};
+
+typedef struct SStreamAggSupporter {
+ int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
+ SSDataBlock* pScanBlock;
+ SStreamState* pState;
+ int64_t gap; // stream session window gap
+ SqlFunctionCtx* pDummyCtx; // for combine
+ SSHashObj* pResultRows;
+ int32_t stateKeySize;
+ int16_t stateKeyType;
+ SDiskbasedBuf* pResultBuf;
+} SStreamAggSupporter;
+
+typedef struct SWindowSupporter {
+ SStreamAggSupporter* pStreamAggSup;
+ int64_t gap;
+ uint16_t parentType;
+ SAggSupporter* pIntervalAggSup;
+} SWindowSupporter;
+
+typedef struct SPartitionBySupporter {
+ SArray* pGroupCols; // group by columns, SArray
+ SArray* pGroupColVals; // current group column values, SArray
+ char* keyBuf; // group by keys for hash
+ bool needCalc; // partition by column
+} SPartitionBySupporter;
+
+typedef struct SPartitionDataInfo {
+ uint64_t groupId;
+ char* tbname;
+ SArray* tags;
+ SArray* rowIds;
+} SPartitionDataInfo;
+
+typedef struct STimeWindowAggSupp {
+ int8_t calTrigger;
+ int8_t calTriggerSaved;
+ int64_t deleteMark;
+ int64_t deleteMarkSaved;
+ int64_t waterMark;
+ TSKEY maxTs;
+ TSKEY minTs;
+ SColumnInfoData timeWindowData; // query time window info for scalar function execution.
+} STimeWindowAggSupp;
+
+typedef struct SStreamScanInfo {
+ SExprInfo* pPseudoExpr;
+ int32_t numOfPseudoExpr;
+ SExprSupp tbnameCalSup;
+ SExprSupp tagCalSup;
+ int32_t primaryTsIndex; // primary time stamp slot id
+ SReadHandle readHandle;
+ SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
+ SColMatchInfo matchInfo;
+
+ SArray* pBlockLists; // multiple SSDatablock.
+ SSDataBlock* pRes; // result SSDataBlock
+ SSDataBlock* pUpdateRes; // update SSDataBlock
+ int32_t updateResIndex;
+ int32_t blockType; // current block type
+ int32_t validBlockIndex; // Is current data has returned?
+ uint64_t numOfExec; // execution times
+ STqReader* tqReader;
+
+ uint64_t groupId;
+ SUpdateInfo* pUpdateInfo;
+
+ EStreamScanMode scanMode;
+ struct SOperatorInfo* pStreamScanOp;
+ struct SOperatorInfo* pTableScanOp;
+ SArray* childIds;
+ SWindowSupporter windowSup;
+ SPartitionBySupporter partitionSup;
+ SExprSupp* pPartScalarSup;
+ bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
+ int32_t scanWinIndex; // for state operator
+ int32_t pullDataResIndex;
+ SSDataBlock* pPullDataRes; // pull data SSDataBlock
+ SSDataBlock* pDeleteDataRes; // delete data SSDataBlock
+ int32_t deleteDataIndex;
+ STimeWindow updateWin;
+ STimeWindowAggSupp twAggSup;
+ SSDataBlock* pUpdateDataRes;
+ // status for tmq
+ SNodeList* pGroupTags;
+ SNode* pTagCond;
+ SNode* pTagIndexCond;
+
+ // recover
+ int32_t blockRecoverContiCnt;
+ int32_t blockRecoverTotCnt;
+ SSDataBlock* pRecoverRes;
+
+ SSDataBlock* pCreateTbRes;
+ int8_t igCheckUpdate;
+ int8_t igExpired;
+} SStreamScanInfo;
+
+typedef struct {
+ SVnode* vnode;
+ SSDataBlock pRes; // result SSDataBlock
+ STsdbReader* dataReader;
+ SSnapContext* sContext;
+ STableListInfo* pTableListInfo;
+} SStreamRawScanInfo;
+
+typedef struct STableCountScanSupp {
+ int16_t dbNameSlotId;
+ int16_t stbNameSlotId;
+ int16_t tbCountSlotId;
+ bool groupByDbName;
+ bool groupByStbName;
+ char dbNameFilter[TSDB_DB_NAME_LEN];
+ char stbNameFilter[TSDB_TABLE_NAME_LEN];
+} STableCountScanSupp;
+
+typedef struct SOptrBasicInfo {
+ SResultRowInfo resultRowInfo;
+ SSDataBlock* pRes;
+ bool mergeResultBlock;
+} SOptrBasicInfo;
+
+typedef struct SIntervalAggOperatorInfo {
+ SOptrBasicInfo binfo; // basic info
+ SAggSupporter aggSup; // aggregate supporter
+ SExprSupp scalarSupp; // supporter for perform scalar function
+ SGroupResInfo groupResInfo; // multiple results build supporter
+ SInterval interval; // interval info
+ int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
+ STimeWindow win; // query time range
+ bool timeWindowInterpo; // interpolation needed or not
+ SArray* pInterpCols; // interpolation columns
+ int32_t resultTsOrder; // result timestamp order
+ int32_t inputOrder; // input data ts order
+ EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
+ STimeWindowAggSupp twAggSup;
+ SArray* pPrevValues; // SArray used to keep the previous not null value for interpolation.
+} SIntervalAggOperatorInfo;
+
+typedef struct SMergeAlignedIntervalAggOperatorInfo {
+ SIntervalAggOperatorInfo* intervalAggOperatorInfo;
+
+ uint64_t groupId; // current groupId
+ int64_t curTs; // current ts
+ SSDataBlock* prefetchedBlock;
+ SResultRow* pResultRow;
+} SMergeAlignedIntervalAggOperatorInfo;
+
+typedef struct SStreamIntervalOperatorInfo {
+ SOptrBasicInfo binfo; // basic info
+ SAggSupporter aggSup; // aggregate supporter
+ SExprSupp scalarSupp; // supporter for perform scalar function
+ SGroupResInfo groupResInfo; // multiple results build supporter
+ SInterval interval; // interval info
+ int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
+ STimeWindowAggSupp twAggSup;
+ bool invertible;
+ bool ignoreExpiredData;
+ bool ignoreExpiredDataSaved;
+ SArray* pDelWins; // SWinRes
+ int32_t delIndex;
+ SSDataBlock* pDelRes;
+ SPhysiNode* pPhyNode; // create new child
+ SHashObj* pPullDataMap;
+ SArray* pPullWins; // SPullWindowInfo
+ int32_t pullIndex;
+ SSDataBlock* pPullDataRes;
+ bool isFinal;
+ SArray* pChildren;
+ SStreamState* pState;
+ SWinKey delKey;
+ uint64_t numOfDatapack;
+ SArray* pUpdated;
+ SSHashObj* pUpdatedMap;
+ int64_t dataVersion;
+} SStreamIntervalOperatorInfo;
+
+typedef struct SDataGroupInfo {
+ uint64_t groupId;
+ int64_t numOfRows;
+ SArray* pPageList;
+} SDataGroupInfo;
+
+typedef struct SWindowRowsSup {
+ STimeWindow win;
+ TSKEY prevTs;
+ int32_t startRowIndex;
+ int32_t numOfRows;
+ uint64_t groupId;
+} SWindowRowsSup;
+
+typedef struct SResultWindowInfo {
+ void* pOutputBuf;
+ SSessionKey sessionWin;
+ bool isOutput;
+} SResultWindowInfo;
+
+typedef struct SStreamSessionAggOperatorInfo {
+ SOptrBasicInfo binfo;
+ SStreamAggSupporter streamAggSup;
+ SExprSupp scalarSupp; // supporter for perform scalar function
+ SGroupResInfo groupResInfo;
+ int32_t primaryTsIndex; // primary timestamp slot id
+ int32_t endTsIndex; // window end timestamp slot id
+ int32_t order; // current SSDataBlock scan order
+ STimeWindowAggSupp twAggSup;
+ SSDataBlock* pWinBlock; // window result
+ SSDataBlock* pDelRes; // delete result
+ SSDataBlock* pUpdateRes; // update window
+ bool returnUpdate;
+ SSHashObj* pStDeleted;
+ void* pDelIterator;
+ SArray* pChildren; // cache for children's result; final stream operator
+ SPhysiNode* pPhyNode; // create new child
+ bool isFinal;
+ bool ignoreExpiredData;
+ bool ignoreExpiredDataSaved;
+ SArray* pUpdated;
+ SSHashObj* pStUpdated;
+ int64_t dataVersion;
+} SStreamSessionAggOperatorInfo;
+
+typedef struct SStreamStateAggOperatorInfo {
+ SOptrBasicInfo binfo;
+ SStreamAggSupporter streamAggSup;
+ SExprSupp scalarSupp; // supporter for perform scalar function
+ SGroupResInfo groupResInfo;
+ int32_t primaryTsIndex; // primary timestamp slot id
+ STimeWindowAggSupp twAggSup;
+ SColumn stateCol;
+ SSDataBlock* pDelRes;
+ SSHashObj* pSeDeleted;
+ void* pDelIterator;
+ SArray* pChildren; // cache for children's result;
+ bool ignoreExpiredData;
+ bool ignoreExpiredDataSaved;
+ SArray* pUpdated;
+ SSHashObj* pSeUpdated;
+ int64_t dataVersion;
+} SStreamStateAggOperatorInfo;
+
+typedef struct SStreamPartitionOperatorInfo {
+ SOptrBasicInfo binfo;
+ SPartitionBySupporter partitionSup;
+ SExprSupp scalarSup;
+ SExprSupp tbnameCalSup;
+ SExprSupp tagCalSup;
+ SHashObj* pPartitions;
+ void* parIte;
+ void* pTbNameIte;
+ SSDataBlock* pInputDataBlock;
+ int32_t tsColIndex;
+ SSDataBlock* pDelRes;
+ SSDataBlock* pCreateTbRes;
+} SStreamPartitionOperatorInfo;
+
+typedef struct SStreamFillSupporter {
+ int32_t type; // fill type
+ SInterval interval;
+ SResultRowData prev;
+ SResultRowData cur;
+ SResultRowData next;
+ SResultRowData nextNext;
+ SFillColInfo* pAllColInfo; // fill exprs and not fill exprs
+ SExprSupp notFillExprSup;
+ int32_t numOfAllCols; // number of all exprs, including the tags columns
+ int32_t numOfFillCols;
+ int32_t numOfNotFillCols;
+ int32_t rowSize;
+ SSHashObj* pResMap;
+ bool hasDelete;
+} SStreamFillSupporter;
+
+typedef struct SStreamFillOperatorInfo {
+ SStreamFillSupporter* pFillSup;
+ SSDataBlock* pRes;
+ SSDataBlock* pSrcBlock;
+ int32_t srcRowIndex;
+ SSDataBlock* pSrcDelBlock;
+ int32_t srcDelRowIndex;
+ SSDataBlock* pDelRes;
+ SColMatchInfo matchInfo;
+ int32_t primaryTsCol;
+ int32_t primarySrcSlotId;
+ SStreamFillInfo* pFillInfo;
+} SStreamFillOperatorInfo;
+
+#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
+#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
+
+SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
+int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, const char* dbName, SExecTaskInfo* pTaskInfo);
+void cleanupQueriedTableScanInfo(SSchemaInfo* pSchemaInfo);
+
+void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
+void cleanupBasicInfo(SOptrBasicInfo* pInfo);
+
+int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
+void cleanupExprSupp(SExprSupp* pSup);
+
+void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs);
+
+int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
+ const char* pkey, void* pState);
+void cleanupAggSup(SAggSupporter* pAggSup);
+
+void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
+
+void doBuildStreamResBlock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
+ SDiskbasedBuf* pBuf);
+void doBuildResultDatablock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
+ SDiskbasedBuf* pBuf);
+
+bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
+bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo);
+void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
+void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo);
+bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
+
+void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
+ int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
+
+int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart);
+void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
+ struct SOperatorInfo* pOperator);
+
+STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
+int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
+
+extern void doDestroyExchangeOperatorInfo(void* param);
+
+void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo);
+int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
+ int32_t rows, const char* idStr, STableMetaCacheInfo* pCache);
+
+void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
+void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name);
+
+void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset);
+void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput);
+
+SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
+ int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
+ bool isIntervalQuery, SAggSupporter* pSup, bool keepGroup);
+
+int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
+ int32_t numOfOutput, SArray* pPseudoList);
+
+void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol);
+
+int32_t checkForQueryBuf(size_t numOfTables);
+
+int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* pTask, SReadHandle* readHandle);
+
+STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
+ int32_t order);
+int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
+ __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
+int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
+SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize);
+void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, SSessionKey* pKey);
+bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
+bool functionNeedToExecute(SqlFunctionCtx* pCtx);
+bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
+bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
+bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup);
+void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
+ uint64_t* pGp, void* pTbName);
+uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
+
+int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
+ SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
+
+bool groupbyTbname(SNodeList* pGroupList);
+int32_t buildDataBlockFromGroupRes(struct SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
+ SGroupResInfo* pGroupResInfo);
+int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size);
+int32_t buildSessionResultDataBlock(struct SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
+ SExprSupp* pSup, SGroupResInfo* pGroupResInfo);
+int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
+ SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup);
+int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult);
+int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
+void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order);
+int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order,
+ int64_t* pData);
+void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId,
+ SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock);
+
+SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag);
+SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs);
+
+void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
+ SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo);
+void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset);
+void doClearBufferedBlocks(SStreamScanInfo* pInfo);
+
uint64_t calcGroupId(char* pData, int32_t len);
+
#ifdef __cplusplus
}
#endif
-#endif /*_TD_EXECUTOR_INT_H*/
\ No newline at end of file
+#endif // TDENGINE_EXECUTORINT_H
diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h
deleted file mode 100644
index 4d19b8bb7685a3020b697ac47ba5266c99f29d0f..0000000000000000000000000000000000000000
--- a/source/libs/executor/inc/executorimpl.h
+++ /dev/null
@@ -1,699 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-#ifndef TDENGINE_EXECUTORIMPL_H
-#define TDENGINE_EXECUTORIMPL_H
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-#include "os.h"
-#include "tcommon.h"
-#include "tlosertree.h"
-#include "tsort.h"
-#include "ttszip.h"
-#include "tvariant.h"
-
-#include "dataSinkMgt.h"
-#include "executil.h"
-#include "executor.h"
-#include "planner.h"
-#include "scalar.h"
-#include "taosdef.h"
-#include "tarray.h"
-#include "tfill.h"
-#include "thash.h"
-#include "tlockfree.h"
-#include "tmsg.h"
-#include "tpagedbuf.h"
-#include "tstream.h"
-#include "tstreamUpdate.h"
-
-#include "executorInt.h"
-#include "vnode.h"
-
-typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
-
-#define IS_VALID_SESSION_WIN(winInfo) ((winInfo).sessionWin.win.skey > 0)
-#define SET_SESSION_WIN_INVALID(winInfo) ((winInfo).sessionWin.win.skey = INT64_MIN)
-#define IS_INVALID_SESSION_WIN_KEY(winKey) ((winKey).win.skey <= 0)
-#define SET_SESSION_WIN_KEY_INVALID(pWinKey) ((pWinKey)->win.skey = INT64_MIN)
-
-enum {
- // when this task starts to execute, this status will set
- TASK_NOT_COMPLETED = 0x1u,
-
- /* Task is over
- * 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc.
- * 2. when all data within queried time window, it is also denoted as query_completed
- */
- TASK_COMPLETED = 0x2u,
-};
-
-/**
- * If the number of generated results is greater than this value,
- * query query will be halt and return results to client immediate.
- */
-typedef struct SResultInfo { // TODO refactor
- int64_t totalRows; // total generated result size in rows
- int64_t totalBytes; // total results in bytes.
- int32_t capacity; // capacity of current result output buffer
- int32_t threshold; // result size threshold in rows.
-} SResultInfo;
-
-typedef struct STableQueryInfo {
- TSKEY lastKey; // last check ts, todo remove it later
- SResultRowPosition pos; // current active time window
-} STableQueryInfo;
-
-typedef struct SLimit {
- int64_t limit;
- int64_t offset;
-} SLimit;
-
-typedef struct STableScanAnalyzeInfo SFileBlockLoadRecorder;
-
-enum {
- STREAM_RECOVER_STEP__NONE = 0,
- STREAM_RECOVER_STEP__PREPARE1,
- STREAM_RECOVER_STEP__PREPARE2,
- STREAM_RECOVER_STEP__SCAN1,
- STREAM_RECOVER_STEP__SCAN2,
-};
-
-typedef struct {
- STqOffsetVal currentOffset; // for tmq
- SMqMetaRsp metaRsp; // for tmq fetching meta
- int64_t snapshotVer;
- SPackedData submit;
- SSchemaWrapper* schema;
- char tbName[TSDB_TABLE_NAME_LEN];
- int8_t recoverStep;
- int8_t recoverScanFinished;
- SQueryTableDataCond tableCond;
- int64_t fillHistoryVer1;
- int64_t fillHistoryVer2;
- SStreamState* pState;
- int64_t dataVersion;
- int64_t checkPointId;
-} SStreamTaskInfo;
-
-typedef struct {
- char* tablename;
- char* dbname;
- int32_t tversion;
- SSchemaWrapper* sw;
- SSchemaWrapper* qsw;
-} SSchemaInfo;
-
-typedef struct SExchangeOpStopInfo {
- int32_t operatorType;
- int64_t refId;
-} SExchangeOpStopInfo;
-
-typedef struct SExprSupp {
- SExprInfo* pExprInfo;
- int32_t numOfExprs; // the number of scalar expression in group operator
- SqlFunctionCtx* pCtx;
- int32_t* rowEntryInfoOffset; // offset value for each row result cell info
- SFilterInfo* pFilterInfo;
-} SExprSupp;
-
-typedef enum {
- EX_SOURCE_DATA_NOT_READY = 0x1,
- EX_SOURCE_DATA_READY = 0x2,
- EX_SOURCE_DATA_EXHAUSTED = 0x3,
-} EX_SOURCE_STATUS;
-
-#define COL_MATCH_FROM_COL_ID 0x1
-#define COL_MATCH_FROM_SLOT_ID 0x2
-
-typedef struct SLoadRemoteDataInfo {
- uint64_t totalSize; // total load bytes from remote
- uint64_t totalRows; // total number of rows
- uint64_t totalElapsed; // total elapsed time
-} SLoadRemoteDataInfo;
-
-typedef struct SLimitInfo {
- SLimit limit;
- SLimit slimit;
- uint64_t currentGroupId;
- int64_t remainGroupOffset;
- int64_t numOfOutputGroups;
- int64_t remainOffset;
- int64_t numOfOutputRows;
-} SLimitInfo;
-
-typedef struct SExchangeInfo {
- SArray* pSources;
- SArray* pSourceDataInfo;
- tsem_t ready;
- void* pTransporter;
-
- // SArray, result block list, used to keep the multi-block that
- // passed by downstream operator
- SArray* pResultBlockList;
- SArray* pRecycledBlocks; // build a pool for small data block to avoid to repeatly create and then destroy.
- SSDataBlock* pDummyBlock; // dummy block, not keep data
- bool seqLoadData; // sequential load data or not, false by default
- int32_t current;
- SLoadRemoteDataInfo loadInfo;
- uint64_t self;
- SLimitInfo limitInfo;
- int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo
-} SExchangeInfo;
-
-typedef struct SScanInfo {
- int32_t numOfAsc;
- int32_t numOfDesc;
-} SScanInfo;
-
-typedef struct SSampleExecInfo {
- double sampleRatio; // data block sample ratio, 1 by default
- uint32_t seed; // random seed value
-} SSampleExecInfo;
-
-enum {
- TABLE_SCAN__TABLE_ORDER = 1,
- TABLE_SCAN__BLOCK_ORDER = 2,
-};
-
-typedef struct SAggSupporter {
- SSHashObj* pResultRowHashTable; // quick locate the window object for each result
- char* keyBuf; // window key buffer
- SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
- int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
- int32_t currentPageId; // current write page id
-} SAggSupporter;
-
-typedef struct {
- // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if
- // current data block needs to be loaded.
- SInterval interval;
- SAggSupporter* pAggSup;
- SExprSupp* pExprSup; // expr supporter of aggregate operator
-} SAggOptrPushDownInfo;
-
-typedef struct STableMetaCacheInfo {
- SLRUCache* pTableMetaEntryCache; // 100 by default
- uint64_t metaFetch;
- uint64_t cacheHit;
-} STableMetaCacheInfo;
-
-typedef struct STableScanBase {
- STsdbReader* dataReader;
- SFileBlockLoadRecorder readRecorder;
- SQueryTableDataCond cond;
- SAggOptrPushDownInfo pdInfo;
- SColMatchInfo matchInfo;
- SReadHandle readHandle;
- SExprSupp pseudoSup;
- STableMetaCacheInfo metaCache;
- int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
- int32_t dataBlockLoadFlag;
- SLimitInfo limitInfo;
- // there are more than one table list exists in one task, if only one vnode exists.
- STableListInfo* pTableListInfo;
-} STableScanBase;
-
-typedef struct STableScanInfo {
- STableScanBase base;
- SScanInfo scanInfo;
- int32_t scanTimes;
- SSDataBlock* pResBlock;
- SSampleExecInfo sample; // sample execution info
- int32_t currentGroupId;
- int32_t currentTable;
- int8_t scanMode;
- int8_t assignBlockUid;
- bool hasGroupByTag;
- bool countOnly;
-} STableScanInfo;
-
-typedef struct STableMergeScanInfo {
- int32_t tableStartIndex;
- int32_t tableEndIndex;
- bool hasGroupId;
- uint64_t groupId;
- SArray* queryConds; // array of queryTableDataCond
- STableScanBase base;
- int32_t bufPageSize;
- uint32_t sortBufSize; // max buffer size for in-memory sort
- SArray* pSortInfo;
- SSortHandle* pSortHandle;
- SSDataBlock* pSortInputBlock;
- int64_t startTs; // sort start time
- SArray* sortSourceParams;
- SLimitInfo limitInfo;
- int64_t numOfRows;
- SScanInfo scanInfo;
- int32_t scanTimes;
- SSDataBlock* pResBlock;
- SSampleExecInfo sample; // sample execution info
- SSortExecInfo sortExecInfo;
-} STableMergeScanInfo;
-
-typedef struct STagScanInfo {
- SColumnInfo* pCols;
- SSDataBlock* pRes;
- SColMatchInfo matchInfo;
- int32_t curPos;
- SReadHandle readHandle;
- STableListInfo* pTableListInfo;
-} STagScanInfo;
-
-typedef enum EStreamScanMode {
- STREAM_SCAN_FROM_READERHANDLE = 1,
- STREAM_SCAN_FROM_RES,
- STREAM_SCAN_FROM_UPDATERES,
- STREAM_SCAN_FROM_DELETE_DATA,
- STREAM_SCAN_FROM_DATAREADER_RETRIEVE,
- STREAM_SCAN_FROM_DATAREADER_RANGE,
-} EStreamScanMode;
-
-enum {
- PROJECT_RETRIEVE_CONTINUE = 0x1,
- PROJECT_RETRIEVE_DONE = 0x2,
-};
-
-typedef struct SStreamAggSupporter {
- int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
- SSDataBlock* pScanBlock;
- SStreamState* pState;
- int64_t gap; // stream session window gap
- SqlFunctionCtx* pDummyCtx; // for combine
- SSHashObj* pResultRows;
- int32_t stateKeySize;
- int16_t stateKeyType;
- SDiskbasedBuf* pResultBuf;
-} SStreamAggSupporter;
-
-typedef struct SWindowSupporter {
- SStreamAggSupporter* pStreamAggSup;
- int64_t gap;
- uint16_t parentType;
- SAggSupporter* pIntervalAggSup;
-} SWindowSupporter;
-
-typedef struct SPartitionBySupporter {
- SArray* pGroupCols; // group by columns, SArray
- SArray* pGroupColVals; // current group column values, SArray
- char* keyBuf; // group by keys for hash
- bool needCalc; // partition by column
-} SPartitionBySupporter;
-
-typedef struct SPartitionDataInfo {
- uint64_t groupId;
- char* tbname;
- SArray* tags;
- SArray* rowIds;
-} SPartitionDataInfo;
-
-typedef struct STimeWindowAggSupp {
- int8_t calTrigger;
- int8_t calTriggerSaved;
- int64_t deleteMark;
- int64_t deleteMarkSaved;
- int64_t waterMark;
- TSKEY maxTs;
- TSKEY minTs;
- SColumnInfoData timeWindowData; // query time window info for scalar function execution.
-} STimeWindowAggSupp;
-
-typedef struct SStreamScanInfo {
- SExprInfo* pPseudoExpr;
- int32_t numOfPseudoExpr;
- SExprSupp tbnameCalSup;
- SExprSupp tagCalSup;
- int32_t primaryTsIndex; // primary time stamp slot id
- SReadHandle readHandle;
- SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
- SColMatchInfo matchInfo;
-
- SArray* pBlockLists; // multiple SSDatablock.
- SSDataBlock* pRes; // result SSDataBlock
- SSDataBlock* pUpdateRes; // update SSDataBlock
- int32_t updateResIndex;
- int32_t blockType; // current block type
- int32_t validBlockIndex; // Is current data has returned?
- uint64_t numOfExec; // execution times
- STqReader* tqReader;
-
- uint64_t groupId;
- SUpdateInfo* pUpdateInfo;
-
- EStreamScanMode scanMode;
- struct SOperatorInfo* pStreamScanOp;
- struct SOperatorInfo* pTableScanOp;
- SArray* childIds;
- SWindowSupporter windowSup;
- SPartitionBySupporter partitionSup;
- SExprSupp* pPartScalarSup;
- bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
- int32_t scanWinIndex; // for state operator
- int32_t pullDataResIndex;
- SSDataBlock* pPullDataRes; // pull data SSDataBlock
- SSDataBlock* pDeleteDataRes; // delete data SSDataBlock
- int32_t deleteDataIndex;
- STimeWindow updateWin;
- STimeWindowAggSupp twAggSup;
- SSDataBlock* pUpdateDataRes;
- // status for tmq
- SNodeList* pGroupTags;
- SNode* pTagCond;
- SNode* pTagIndexCond;
-
- // recover
- int32_t blockRecoverContiCnt;
- int32_t blockRecoverTotCnt;
- SSDataBlock* pRecoverRes;
-
- SSDataBlock* pCreateTbRes;
- int8_t igCheckUpdate;
- int8_t igExpired;
-} SStreamScanInfo;
-
-typedef struct {
- SVnode* vnode;
- SSDataBlock pRes; // result SSDataBlock
- STsdbReader* dataReader;
- SSnapContext* sContext;
- STableListInfo* pTableListInfo;
-} SStreamRawScanInfo;
-
-typedef struct STableCountScanSupp {
- int16_t dbNameSlotId;
- int16_t stbNameSlotId;
- int16_t tbCountSlotId;
- bool groupByDbName;
- bool groupByStbName;
- char dbNameFilter[TSDB_DB_NAME_LEN];
- char stbNameFilter[TSDB_TABLE_NAME_LEN];
-} STableCountScanSupp;
-
-typedef struct SOptrBasicInfo {
- SResultRowInfo resultRowInfo;
- SSDataBlock* pRes;
- bool mergeResultBlock;
-} SOptrBasicInfo;
-
-typedef struct SIntervalAggOperatorInfo {
- SOptrBasicInfo binfo; // basic info
- SAggSupporter aggSup; // aggregate supporter
- SExprSupp scalarSupp; // supporter for perform scalar function
- SGroupResInfo groupResInfo; // multiple results build supporter
- SInterval interval; // interval info
- int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
- STimeWindow win; // query time range
- bool timeWindowInterpo; // interpolation needed or not
- SArray* pInterpCols; // interpolation columns
- int32_t resultTsOrder; // result timestamp order
- int32_t inputOrder; // input data ts order
- EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
- STimeWindowAggSupp twAggSup;
- SArray* pPrevValues; // SArray used to keep the previous not null value for interpolation.
-} SIntervalAggOperatorInfo;
-
-typedef struct SMergeAlignedIntervalAggOperatorInfo {
- SIntervalAggOperatorInfo* intervalAggOperatorInfo;
-
- uint64_t groupId; // current groupId
- int64_t curTs; // current ts
- SSDataBlock* prefetchedBlock;
- SResultRow* pResultRow;
-} SMergeAlignedIntervalAggOperatorInfo;
-
-typedef struct SStreamIntervalOperatorInfo {
- SOptrBasicInfo binfo; // basic info
- SAggSupporter aggSup; // aggregate supporter
- SExprSupp scalarSupp; // supporter for perform scalar function
- SGroupResInfo groupResInfo; // multiple results build supporter
- SInterval interval; // interval info
- int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
- STimeWindowAggSupp twAggSup;
- bool invertible;
- bool ignoreExpiredData;
- bool ignoreExpiredDataSaved;
- SArray* pDelWins; // SWinRes
- int32_t delIndex;
- SSDataBlock* pDelRes;
- SPhysiNode* pPhyNode; // create new child
- SHashObj* pPullDataMap;
- SArray* pPullWins; // SPullWindowInfo
- int32_t pullIndex;
- SSDataBlock* pPullDataRes;
- bool isFinal;
- SArray* pChildren;
- SStreamState* pState;
- SWinKey delKey;
- uint64_t numOfDatapack;
- SArray* pUpdated;
- SSHashObj* pUpdatedMap;
- int64_t dataVersion;
-} SStreamIntervalOperatorInfo;
-
-typedef struct SDataGroupInfo {
- uint64_t groupId;
- int64_t numOfRows;
- SArray* pPageList;
-} SDataGroupInfo;
-
-typedef struct SWindowRowsSup {
- STimeWindow win;
- TSKEY prevTs;
- int32_t startRowIndex;
- int32_t numOfRows;
- uint64_t groupId;
-} SWindowRowsSup;
-
-typedef struct SResultWindowInfo {
- void* pOutputBuf;
- SSessionKey sessionWin;
- bool isOutput;
-} SResultWindowInfo;
-
-typedef struct SStateWindowInfo {
- SResultWindowInfo winInfo;
- SStateKeys* pStateKey;
-} SStateWindowInfo;
-
-typedef struct SStreamSessionAggOperatorInfo {
- SOptrBasicInfo binfo;
- SStreamAggSupporter streamAggSup;
- SExprSupp scalarSupp; // supporter for perform scalar function
- SGroupResInfo groupResInfo;
- int32_t primaryTsIndex; // primary timestamp slot id
- int32_t endTsIndex; // window end timestamp slot id
- int32_t order; // current SSDataBlock scan order
- STimeWindowAggSupp twAggSup;
- SSDataBlock* pWinBlock; // window result
- SSDataBlock* pDelRes; // delete result
- SSDataBlock* pUpdateRes; // update window
- bool returnUpdate;
- SSHashObj* pStDeleted;
- void* pDelIterator;
- SArray* pChildren; // cache for children's result; final stream operator
- SPhysiNode* pPhyNode; // create new child
- bool isFinal;
- bool ignoreExpiredData;
- bool ignoreExpiredDataSaved;
- SArray* pUpdated;
- SSHashObj* pStUpdated;
- int64_t dataVersion;
-} SStreamSessionAggOperatorInfo;
-
-typedef struct SStreamStateAggOperatorInfo {
- SOptrBasicInfo binfo;
- SStreamAggSupporter streamAggSup;
- SExprSupp scalarSupp; // supporter for perform scalar function
- SGroupResInfo groupResInfo;
- int32_t primaryTsIndex; // primary timestamp slot id
- STimeWindowAggSupp twAggSup;
- SColumn stateCol;
- SSDataBlock* pDelRes;
- SSHashObj* pSeDeleted;
- void* pDelIterator;
- SArray* pChildren; // cache for children's result;
- bool ignoreExpiredData;
- bool ignoreExpiredDataSaved;
- SArray* pUpdated;
- SSHashObj* pSeUpdated;
- int64_t dataVersion;
-} SStreamStateAggOperatorInfo;
-
-typedef struct SStreamPartitionOperatorInfo {
- SOptrBasicInfo binfo;
- SPartitionBySupporter partitionSup;
- SExprSupp scalarSup;
- SExprSupp tbnameCalSup;
- SExprSupp tagCalSup;
- SHashObj* pPartitions;
- void* parIte;
- void* pTbNameIte;
- SSDataBlock* pInputDataBlock;
- int32_t tsColIndex;
- SSDataBlock* pDelRes;
- SSDataBlock* pCreateTbRes;
-} SStreamPartitionOperatorInfo;
-
-typedef struct SStreamFillSupporter {
- int32_t type; // fill type
- SInterval interval;
- SResultRowData prev;
- SResultRowData cur;
- SResultRowData next;
- SResultRowData nextNext;
- SFillColInfo* pAllColInfo; // fill exprs and not fill exprs
- SExprSupp notFillExprSup;
- int32_t numOfAllCols; // number of all exprs, including the tags columns
- int32_t numOfFillCols;
- int32_t numOfNotFillCols;
- int32_t rowSize;
- SSHashObj* pResMap;
- bool hasDelete;
-} SStreamFillSupporter;
-
-typedef struct SStreamFillOperatorInfo {
- SStreamFillSupporter* pFillSup;
- SSDataBlock* pRes;
- SSDataBlock* pSrcBlock;
- int32_t srcRowIndex;
- SSDataBlock* pSrcDelBlock;
- int32_t srcDelRowIndex;
- SSDataBlock* pDelRes;
- SColMatchInfo matchInfo;
- int32_t primaryTsCol;
- int32_t primarySrcSlotId;
- SStreamFillInfo* pFillInfo;
-} SStreamFillOperatorInfo;
-
-#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
-#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
-
-SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
-int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, const char* dbName, SExecTaskInfo* pTaskInfo);
-
-void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
-void cleanupBasicInfo(SOptrBasicInfo* pInfo);
-
-int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
-void cleanupExprSupp(SExprSupp* pSup);
-
-void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs);
-
-int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
- const char* pkey, void* pState);
-void cleanupAggSup(SAggSupporter* pAggSup);
-
-void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
-
-void doBuildStreamResBlock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
- SDiskbasedBuf* pBuf);
-void doBuildResultDatablock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
- SDiskbasedBuf* pBuf);
-
-bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
-bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo);
-void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
-void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo);
-bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
-
-void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
- int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
-
-int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart);
-void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
- struct SOperatorInfo* pOperator);
-
-STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
-
-int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
-
-extern void doDestroyExchangeOperatorInfo(void* param);
-
-void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo);
-int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
- int32_t rows, const char* idStr, STableMetaCacheInfo* pCache);
-
-void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
-void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name);
-
-void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset);
-void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput);
-
-SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
- int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
- bool isIntervalQuery, SAggSupporter* pSup, bool keepGroup);
-
-int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
- int32_t numOfOutput, SArray* pPseudoList);
-
-void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol);
-
-int32_t checkForQueryBuf(size_t numOfTables);
-
-SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo);
-
-int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* pTask, SReadHandle* readHandle);
-int32_t getOperatorExplainExecInfo(struct SOperatorInfo* operatorInfo, SArray* pExecInfoList);
-
-STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
- int32_t order);
-int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
- __block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
-int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
-SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize);
-void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, SSessionKey* pKey);
-bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
-bool functionNeedToExecute(SqlFunctionCtx* pCtx);
-bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
-bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
-bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup);
-void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
- uint64_t* pGp, void* pTbName);
-uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
-
-int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
- SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
-
-bool groupbyTbname(SNodeList* pGroupList);
-int32_t buildDataBlockFromGroupRes(struct SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
- SGroupResInfo* pGroupResInfo);
-int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size);
-int32_t buildSessionResultDataBlock(struct SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
- SExprSupp* pSup, SGroupResInfo* pGroupResInfo);
-int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
- SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup);
-int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult);
-int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
-void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order);
-int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order,
- int64_t* pData);
-void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId,
- SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock);
-
-SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag);
-SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs);
-
-void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
- SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo);
-void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset);
-void doClearBufferedBlocks(SStreamScanInfo* pInfo);
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif // TDENGINE_EXECUTORIMPL_H
diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h
index 92018b6e3e9c3bd2d827e9ca50fd13b6e20dee8f..632b817a0754d2d8d4482200df5ac28e18bcf02c 100644
--- a/source/libs/executor/inc/operator.h
+++ b/source/libs/executor/inc/operator.h
@@ -157,6 +157,7 @@ void destroyOperator(SOperatorInfo* pOperator);
SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id);
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder);
int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdStr);
+int32_t getOperatorExplainExecInfo(struct SOperatorInfo* operatorInfo, SArray* pExecInfoList);
#ifdef __cplusplus
}
diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h
index aa42cd2c14ccdb76ded582dd47786a871988f3e2..8852265da0dedacd453ae2e362de930186ee017e 100644
--- a/source/libs/executor/inc/querytask.h
+++ b/source/libs/executor/inc/querytask.h
@@ -22,6 +22,17 @@ extern "C" {
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str)
+enum {
+ // when this task starts to execute, this status will set
+ TASK_NOT_COMPLETED = 0x1u,
+
+ /* Task is over
+ * 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc.
+ * 2. when all data within queried time window, it is also denoted as query_completed
+ */
+ TASK_COMPLETED = 0x2u,
+};
+
typedef struct STaskIdInfo {
uint64_t queryId; // this is also a request id
uint64_t subplanId;
@@ -44,6 +55,23 @@ typedef struct STaskStopInfo {
SArray* pStopInfo;
} STaskStopInfo;
+typedef struct {
+ STqOffsetVal currentOffset; // for tmq
+ SMqMetaRsp metaRsp; // for tmq fetching meta
+ int64_t snapshotVer;
+ SPackedData submit; // todo remove it
+ SSchemaWrapper* schema;
+ char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor
+ int8_t recoverStep;
+ int8_t recoverScanFinished;
+ SQueryTableDataCond tableCond;
+ int64_t fillHistoryVer1;
+ int64_t fillHistoryVer2;
+ SStreamState* pState;
+ int64_t dataVersion;
+ int64_t checkPointId;
+} SStreamTaskInfo;
+
struct SExecTaskInfo {
STaskIdInfo id;
uint32_t status;
@@ -75,6 +103,7 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
int32_t vgId, char* sql, EOPTR_EXEC_MODEL model);
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo);
+SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo);
#ifdef __cplusplus
}
diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c
index c00c167d8e03b5db61f31ede8e8bab5f95ccfd76..9b463a3dee630d089750f424c572163f532d500e 100644
--- a/source/libs/executor/src/aggregateoperator.c
+++ b/source/libs/executor/src/aggregateoperator.c
@@ -20,16 +20,16 @@
#include "tfill.h"
#include "tname.h"
-#include "tdatablock.h"
-#include "tglobal.h"
-#include "executorimpl.h"
+#include "executorInt.h"
#include "index.h"
+#include "operator.h"
#include "query.h"
+#include "querytask.h"
#include "tcompare.h"
+#include "tdatablock.h"
+#include "tglobal.h"
#include "thash.h"
#include "ttypes.h"
-#include "operator.h"
-#include "querytask.h"
typedef struct {
bool hasAgg;
diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c
index 6fee0899ced11687ea70684beae95bf7deee3b39..eec34a640675015994294248dcef3e5c9af90b86 100644
--- a/source/libs/executor/src/cachescanoperator.c
+++ b/source/libs/executor/src/cachescanoperator.c
@@ -20,12 +20,12 @@
#include "tdatablock.h"
#include "tmsg.h"
-#include "executorimpl.h"
+#include "executorInt.h"
+#include "operator.h"
+#include "querytask.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
-#include "operator.h"
-#include "querytask.h"
typedef struct SCacheRowsScanInfo {
SSDataBlock* pRes;
diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c
index 96d061fc049f3cea16d8337f8fbdc51746068a42..11074b0e94b183612ec58004581ed696d5febf05 100644
--- a/source/libs/executor/src/dataDeleter.c
+++ b/source/libs/executor/src/dataDeleter.c
@@ -15,7 +15,7 @@
#include "dataSinkInt.h"
#include "dataSinkMgt.h"
-#include "executorimpl.h"
+#include "executorInt.h"
#include "planner.h"
#include "tcompression.h"
#include "tdatablock.h"
diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c
index 8e32559fac33ff875309f91b0eab35f362d31253..ce8dc898a5e11ecbef34a0fe0e2353466b53804d 100644
--- a/source/libs/executor/src/dataDispatcher.c
+++ b/source/libs/executor/src/dataDispatcher.c
@@ -15,7 +15,7 @@
#include "dataSinkInt.h"
#include "dataSinkMgt.h"
-#include "executorimpl.h"
+#include "executorInt.h"
#include "planner.h"
#include "tcompression.h"
#include "tdatablock.h"
diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c
index 22f388d40675243348a52d5d548d6d0ee2dc3ddd..33eccf47593f1c17dbcd01c3f932844bb3d76180 100644
--- a/source/libs/executor/src/dataInserter.c
+++ b/source/libs/executor/src/dataInserter.c
@@ -15,7 +15,7 @@
#include "dataSinkInt.h"
#include "dataSinkMgt.h"
-#include "executorimpl.h"
+#include "executorInt.h"
#include "planner.h"
#include "tcompression.h"
#include "tdatablock.h"
diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c
index 8ec7ed91a38dfdd801a50ee8dddce525e329921f..956d5b714d1fbf98f7aec024605befc31621eeef 100644
--- a/source/libs/executor/src/eventwindowoperator.c
+++ b/source/libs/executor/src/eventwindowoperator.c
@@ -13,16 +13,16 @@
* along with this program. If not, see .
*/
-#include "executorimpl.h"
+#include "executorInt.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
+#include "operator.h"
+#include "querytask.h"
#include "tcommon.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "ttime.h"
-#include "operator.h"
-#include "querytask.h"
typedef struct SEventWindowOperatorInfo {
SOptrBasicInfo binfo;
diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c
index 31445dd5683fe28624c546abab21a5095827c273..94041140d4e60030a4b93a1d6a6b7b4df9eacd85 100644
--- a/source/libs/executor/src/exchangeoperator.c
+++ b/source/libs/executor/src/exchangeoperator.c
@@ -13,19 +13,19 @@
* along with this program. If not, see .
*/
+#include "executorInt.h"
#include "filter.h"
#include "function.h"
-#include "os.h"
-#include "tname.h"
-#include "tref.h"
-#include "tdatablock.h"
-#include "tmsg.h"
-#include "executorimpl.h"
#include "index.h"
-#include "query.h"
-#include "thash.h"
#include "operator.h"
+#include "os.h"
+#include "query.h"
#include "querytask.h"
+#include "tdatablock.h"
+#include "thash.h"
+#include "tmsg.h"
+#include "tname.h"
+#include "tref.h"
typedef struct SFetchRspHandleWrapper {
uint32_t exchangeId;
diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c
index 75e1d374e40f5470d7229d46761daa6ff2a52bf4..c51dc39b5b06bfa7eda2481905a8a8427714fe68 100644
--- a/source/libs/executor/src/executil.c
+++ b/source/libs/executor/src/executil.c
@@ -24,9 +24,9 @@
#include "ttime.h"
#include "executil.h"
-#include "executorimpl.h"
-#include "tcompression.h"
+#include "executorInt.h"
#include "querytask.h"
+#include "tcompression.h"
typedef struct STableListIdInfo {
uint64_t suid;
diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c
index 2b9ab6c09a270ad30b81aaf643b84e24ab0faa37..7a5715e5eda8bc02ede6f16ba6f1a0aad264c976 100644
--- a/source/libs/executor/src/executor.c
+++ b/source/libs/executor/src/executor.c
@@ -14,14 +14,14 @@
*/
#include "executor.h"
-#include "executorimpl.h"
+#include "executorInt.h"
+#include "operator.h"
#include "planner.h"
+#include "querytask.h"
#include "tdatablock.h"
#include "tref.h"
#include "tudf.h"
#include "vnode.h"
-#include "operator.h"
-#include "querytask.h"
static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
int32_t exchangeObjRefPool = -1;
@@ -718,8 +718,6 @@ void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
taosArrayRemove(pTaskInfo->stopInfo.pStopInfo, idx);
}
taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
-
- return;
}
void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
@@ -1304,3 +1302,25 @@ SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
taosArrayDestroy(plist);
return pUidList;
}
+
+static void extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
+ if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
+ SStreamScanInfo* pScanInfo = pOperator->info;
+ STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
+ taosArrayPush(pList, &pTableScanInfo->base.pTableListInfo);
+ } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
+ STableScanInfo* pScanInfo = pOperator->info;
+ taosArrayPush(pList, &pScanInfo->base.pTableListInfo);
+ } else {
+ if (pOperator->pDownstream != NULL && pOperator->pDownstream[0] != NULL) {
+ extractTableList(pList, pOperator->pDownstream[0]);
+ }
+ }
+}
+
+SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo) {
+ SArray* pArray = taosArrayInit(0, POINTER_BYTES);
+ SOperatorInfo* pOperator = pTaskInfo->pRoot;
+ extractTableList(pArray, pOperator);
+ return pArray;
+}
\ No newline at end of file
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorInt.c
similarity index 95%
rename from source/libs/executor/src/executorimpl.c
rename to source/libs/executor/src/executorInt.c
index 3ab929cad0816202162433827f257a80b76b0fb7..f525f6728cdef497717c75b87bd7829af3911a60 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorInt.c
@@ -25,15 +25,15 @@
#include "tmsg.h"
#include "ttime.h"
-#include "executorimpl.h"
+#include "executorInt.h"
#include "index.h"
+#include "operator.h"
#include "query.h"
+#include "querytask.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
#include "vnode.h"
-#include "operator.h"
-#include "querytask.h"
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
@@ -1059,37 +1059,6 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo*
return TSDB_CODE_SUCCESS;
}
-int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
- SExplainExecInfo execInfo = {0};
- SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);
-
- pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows;
- pExplainInfo->startupCost = operatorInfo->cost.openCost;
- pExplainInfo->totalCost = operatorInfo->cost.totalCost;
- pExplainInfo->verboseLen = 0;
- pExplainInfo->verboseInfo = NULL;
-
- if (operatorInfo->fpSet.getExplainFn) {
- int32_t code =
- operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
- if (code) {
- qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
- return code;
- }
- }
-
- int32_t code = 0;
- for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
- code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList);
- if (code != TSDB_CODE_SUCCESS) {
- // taosMemoryFreeClear(*pRes);
- return TSDB_CODE_OUT_OF_MEMORY;
- }
- }
-
- return TSDB_CODE_SUCCESS;
-}
-
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) {
SWinKey key = {
@@ -1331,25 +1300,3 @@ void qStreamCloseTsdbReader(void* task) {
}
}
}
-
-static void extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
- if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
- SStreamScanInfo* pScanInfo = pOperator->info;
- STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
- taosArrayPush(pList, &pTableScanInfo->base.pTableListInfo);
- } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
- STableScanInfo* pScanInfo = pOperator->info;
- taosArrayPush(pList, &pScanInfo->base.pTableListInfo);
- } else {
- if (pOperator->pDownstream != NULL && pOperator->pDownstream[0] != NULL) {
- extractTableList(pList, pOperator->pDownstream[0]);
- }
- }
-}
-
-SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo) {
- SArray* pArray = taosArrayInit(0, POINTER_BYTES);
- SOperatorInfo* pOperator = pTaskInfo->pRoot;
- extractTableList(pArray, pOperator);
- return pArray;
-}
\ No newline at end of file
diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c
index e465e6a52518dc0688ed8cded6b1ebde9b556859..0ac9e6097f1647bdd0f39a26125fdbf1181c165a 100644
--- a/source/libs/executor/src/filloperator.c
+++ b/source/libs/executor/src/filloperator.c
@@ -20,7 +20,7 @@
#include "tmsg.h"
#include "ttypes.h"
-#include "executorimpl.h"
+#include "executorInt.h"
#include "tcommon.h"
#include "thash.h"
#include "ttime.h"
diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c
index 821db4244ed396bebff1fc45234fa075efe25ecc..47338d44696ffca98494ab075a73114fcfc0b8bc 100644
--- a/source/libs/executor/src/groupoperator.c
+++ b/source/libs/executor/src/groupoperator.c
@@ -22,12 +22,11 @@
#include "tmsg.h"
#include "executorInt.h"
-#include "executorimpl.h"
+#include "operator.h"
+#include "querytask.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
-#include "operator.h"
-#include "querytask.h"
typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo;
diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c
index b4862a490fb540a143b2c780bd6283b01bbe0b6f..754b5f47373e8444c55c9eb64c6ab42694c5142d 100644
--- a/source/libs/executor/src/joinoperator.c
+++ b/source/libs/executor/src/joinoperator.c
@@ -13,18 +13,18 @@
* along with this program. If not, see .
*/
+#include "executorInt.h"
#include "filter.h"
-#include "executorimpl.h"
#include "function.h"
+#include "operator.h"
#include "os.h"
#include "querynodes.h"
+#include "querytask.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "thash.h"
#include "tmsg.h"
#include "ttypes.h"
-#include "operator.h"
-#include "querytask.h"
typedef struct SJoinRowCtx {
bool rowRemains;
diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c
index 5e384ca6dd13f6b9092853139d33c65ec77d51e1..729178dc60b482fc82b0fddba5b78ce5358c216d 100644
--- a/source/libs/executor/src/operator.c
+++ b/source/libs/executor/src/operator.c
@@ -20,12 +20,12 @@
#include "tglobal.h"
-#include "executorimpl.h"
+#include "executorInt.h"
#include "index.h"
-#include "query.h"
-#include "vnode.h"
#include "operator.h"
+#include "query.h"
#include "querytask.h"
+#include "vnode.h"
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
__optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn,
@@ -75,28 +75,6 @@ void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, b
pOperator->pTaskInfo = pTaskInfo;
}
-void destroyOperator(SOperatorInfo* pOperator) {
- if (pOperator == NULL) {
- return;
- }
-
- if (pOperator->fpSet.closeFn != NULL) {
- pOperator->fpSet.closeFn(pOperator->info);
- }
-
- if (pOperator->pDownstream != NULL) {
- for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
- destroyOperator(pOperator->pDownstream[i]);
- }
-
- taosMemoryFreeClear(pOperator->pDownstream);
- pOperator->numOfDownstream = 0;
- }
-
- cleanupExprSupp(&pOperator->exprSupp);
- taosMemoryFreeClear(pOperator);
-}
-
// each operator should be set their own function to return total cost buffer
int32_t optrDefaultBufFn(SOperatorInfo* pOperator) {
if (pOperator->blocking) {
@@ -106,40 +84,6 @@ int32_t optrDefaultBufFn(SOperatorInfo* pOperator) {
}
}
-//int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder) {
-// // todo add more information about exchange operation
-// int32_t type = pOperator->operatorType;
-// if (type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN ||
-// type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN ||
-// type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) {
-// *order = TSDB_ORDER_ASC;
-// *scanFlag = MAIN_SCAN;
-// return TSDB_CODE_SUCCESS;
-// } else if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
-// if (!inheritUsOrder) {
-// *order = TSDB_ORDER_ASC;
-// }
-// *scanFlag = MAIN_SCAN;
-// return TSDB_CODE_SUCCESS;
-// } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
-// STableScanInfo* pTableScanInfo = pOperator->info;
-// *order = pTableScanInfo->base.cond.order;
-// *scanFlag = pTableScanInfo->base.scanFlag;
-// return TSDB_CODE_SUCCESS;
-// } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) {
-// STableMergeScanInfo* pTableScanInfo = pOperator->info;
-// *order = pTableScanInfo->base.cond.order;
-// *scanFlag = pTableScanInfo->base.scanFlag;
-// return TSDB_CODE_SUCCESS;
-// } else {
-// if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
-// return TSDB_CODE_INVALID_PARA;
-// } else {
-// return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag, inheritUsOrder);
-// }
-// }
-//}
-
static int64_t getQuerySupportBufSize(size_t numOfTables) {
size_t s1 = sizeof(STableQueryInfo);
// size_t s3 = sizeof(STableCheckInfo); buffer consumption in tsdb
@@ -319,7 +263,7 @@ int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdStr) {
}
SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
- SNode* pTagIndexCond, const char* pUser, const char* dbname) {
+ SNode* pTagIndexCond, const char* pUser, const char* dbname) {
int32_t type = nodeType(pPhyNode);
const char* idstr = GET_TASKID(pTaskInfo);
@@ -347,7 +291,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo);
if (code) {
- pTaskInfo->code = terrno;
+ pTaskInfo->code = code;
tableListDestroy(pTableListInfo);
return NULL;
}
@@ -355,6 +299,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo);
if (NULL == pOperator) {
pTaskInfo->code = terrno;
+ tableListDestroy(pTableListInfo);
return NULL;
}
@@ -578,3 +523,56 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
return pOptr;
}
+
+void destroyOperator(SOperatorInfo* pOperator) {
+ if (pOperator == NULL) {
+ return;
+ }
+
+ if (pOperator->fpSet.closeFn != NULL) {
+ pOperator->fpSet.closeFn(pOperator->info);
+ }
+
+ if (pOperator->pDownstream != NULL) {
+ for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
+ destroyOperator(pOperator->pDownstream[i]);
+ }
+
+ taosMemoryFreeClear(pOperator->pDownstream);
+ pOperator->numOfDownstream = 0;
+ }
+
+ cleanupExprSupp(&pOperator->exprSupp);
+ taosMemoryFreeClear(pOperator);
+}
+
+int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
+ SExplainExecInfo execInfo = {0};
+ SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);
+
+ pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows;
+ pExplainInfo->startupCost = operatorInfo->cost.openCost;
+ pExplainInfo->totalCost = operatorInfo->cost.totalCost;
+ pExplainInfo->verboseLen = 0;
+ pExplainInfo->verboseInfo = NULL;
+
+ if (operatorInfo->fpSet.getExplainFn) {
+ int32_t code =
+ operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
+ if (code) {
+ qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
+ return code;
+ }
+ }
+
+ int32_t code = 0;
+ for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
+ code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList);
+ if (code != TSDB_CODE_SUCCESS) {
+ // taosMemoryFreeClear(*pRes);
+ return TSDB_CODE_OUT_OF_MEMORY;
+ }
+ }
+
+ return TSDB_CODE_SUCCESS;
+}
diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c
index 01ee18b41f63631c150699faeb4b364e6f808a96..02f504bef03aaf4cff5d1193a2c23fd66b0b6146 100644
--- a/source/libs/executor/src/projectoperator.c
+++ b/source/libs/executor/src/projectoperator.c
@@ -13,7 +13,7 @@
* along with this program. If not, see .
*/
-#include "executorimpl.h"
+#include "executorInt.h"
#include "filter.h"
#include "functionMgt.h"
#include "operator.h"
diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c
index ed209efed0a2412b68aa67d83130d9f103d45778..b6b250a3255f7184d7e9727def1ea4116e08652a 100644
--- a/source/libs/executor/src/querytask.c
+++ b/source/libs/executor/src/querytask.c
@@ -24,14 +24,14 @@
#include "tdatablock.h"
#include "tmsg.h"
-#include "executorimpl.h"
+#include "executorInt.h"
#include "index.h"
+#include "operator.h"
#include "query.h"
+#include "querytask.h"
#include "thash.h"
#include "ttypes.h"
#include "vnode.h"
-#include "operator.h"
-#include "querytask.h"
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c
index a717c8b7c06c550e8d7e74164760b9df96b24d38..130cca9cbb2fc1c3e1eb73afa6ab1409a586988f 100644
--- a/source/libs/executor/src/scanoperator.c
+++ b/source/libs/executor/src/scanoperator.c
@@ -13,7 +13,7 @@
* along with this program. If not, see .
*/
-#include "executorimpl.h"
+#include "executorInt.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c
index 0b23a803c6348f417d6c91a204112c6d8d852e0e..10933f285c6d93fafdc86b6e0e30aca3a5e22289 100644
--- a/source/libs/executor/src/sortoperator.c
+++ b/source/libs/executor/src/sortoperator.c
@@ -13,11 +13,11 @@
* along with this program. If not, see .
*/
+#include "executorInt.h"
#include "filter.h"
-#include "executorimpl.h"
-#include "tdatablock.h"
#include "operator.h"
#include "querytask.h"
+#include "tdatablock.h"
typedef struct SSortOperatorInfo {
SOptrBasicInfo binfo;
diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c
index bc850b5333a789d98d76bc8de2f5d0ce5a22e084..c75c49fe77cfdb7ee690adb980c0be1059c59e02 100644
--- a/source/libs/executor/src/sysscanoperator.c
+++ b/source/libs/executor/src/sysscanoperator.c
@@ -13,7 +13,7 @@
* along with this program. If not, see .
*/
-#include "executorimpl.h"
+#include "executorInt.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c
index 8376caace0698bac32a423898c26089262a33bd8..fc4e82b57f2e0440c19c81c715e8c731a478e7c7 100644
--- a/source/libs/executor/src/tfill.c
+++ b/source/libs/executor/src/tfill.c
@@ -20,7 +20,7 @@
#include "tmsg.h"
#include "ttypes.h"
-#include "executorimpl.h"
+#include "executorInt.h"
#include "tcommon.h"
#include "thash.h"
#include "ttime.h"
diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c
index a688920a22bb34f7d93a40a7e3e42c52b1aaf875..29e3668ec42fee8021dbd70ba9847eb072a37fc6 100644
--- a/source/libs/executor/src/timesliceoperator.c
+++ b/source/libs/executor/src/timesliceoperator.c
@@ -12,17 +12,17 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
-#include "executorimpl.h"
+#include "executorInt.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
+#include "operator.h"
+#include "querytask.h"
#include "tcommon.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tfill.h"
#include "ttime.h"
-#include "operator.h"
-#include "querytask.h"
typedef struct STimeSliceOperatorInfo {
SSDataBlock* pRes;
diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c
index ab9b0987fae795b6e484899af8b815d0173f15fa..bea01fa0d818ad8fc925b2356af15d6d30563437 100644
--- a/source/libs/executor/src/timewindowoperator.c
+++ b/source/libs/executor/src/timewindowoperator.c
@@ -12,21 +12,27 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
-#include "executorimpl.h"
+#include "executorInt.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
+#include "operator.h"
+#include "querytask.h"
#include "tcommon.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tfill.h"
#include "ttime.h"
-#include "operator.h"
-#include "querytask.h"
#define IS_FINAL_OP(op) ((op)->isFinal)
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);
+typedef struct SStateWindowInfo {
+ SResultWindowInfo winInfo;
+ SStateKeys* pStateKey;
+} SStateWindowInfo;
+
+
typedef struct SSessionAggOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp
index db0678f2142dad234c844a15452af928c2d4b51c..cefe12990d53ade95ba48c3f9e380dbdb2e1f62a 100644
--- a/source/libs/executor/test/executorTests.cpp
+++ b/source/libs/executor/test/executorTests.cpp
@@ -24,13 +24,13 @@
#include "os.h"
#include "executor.h"
-#include "executorimpl.h"
+#include "executorInt.h"
#include "function.h"
+#include "operator.h"
#include "taos.h"
#include "tdatablock.h"
#include "tdef.h"
#include "tvariant.h"
-#include "operator.h"
namespace {
diff --git a/source/libs/executor/test/lhashTests.cpp b/source/libs/executor/test/lhashTests.cpp
index 24570ff7889aa09dcd160a8703be88e57bbc35c9..92f7652d8df01c8ed8cfa74ef77a5d6d84048d3a 100644
--- a/source/libs/executor/test/lhashTests.cpp
+++ b/source/libs/executor/test/lhashTests.cpp
@@ -15,7 +15,7 @@
#include
#include
-#include "executorimpl.h"
+#include "executorInt.h"
#include "tlinearhash.h"
#pragma GCC diagnostic push
diff --git a/source/libs/executor/test/sortTests.cpp b/source/libs/executor/test/sortTests.cpp
index f35d07804e0af1ba02ff6237b2959030a517725c..8122d7d6a91fed15eddce1fa74150600475043c0 100644
--- a/source/libs/executor/test/sortTests.cpp
+++ b/source/libs/executor/test/sortTests.cpp
@@ -26,7 +26,7 @@
#include "os.h"
#include "executor.h"
-#include "executorimpl.h"
+#include "executorInt.h"
#include "taos.h"
#include "tcompare.h"
#include "tdatablock.h"