提交 da0d9c78 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 3ac1ce69
......@@ -107,7 +107,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__RECOVER_PREPARE ||
status == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, status);
tqDebug("s-task:%s skip push data, not ready for processing, status:%d", pTask->id.idStr, status);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
......
......@@ -130,6 +130,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, terrstr());
return -1;
}
tqDebug("tmqsnap task execute end, get %p", pDataBlock);
if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
......
......@@ -12,14 +12,75 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_EXECUTOR_INT_H
#define _TD_EXECUTOR_INT_H
#ifndef TDENGINE_EXECUTORINT_H
#define TDENGINE_EXECUTORINT_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#include "tcommon.h"
#include "tlosertree.h"
#include "tsort.h"
#include "ttszip.h"
#include "tvariant.h"
#include "dataSinkMgt.h"
#include "executil.h"
#include "executor.h"
#include "planner.h"
#include "scalar.h"
#include "taosdef.h"
#include "tarray.h"
#include "tfill.h"
#include "thash.h"
#include "tlockfree.h"
#include "tmsg.h"
#include "tpagedbuf.h"
#include "tstream.h"
#include "tstreamUpdate.h"
#include "vnode.h"
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
#define IS_VALID_SESSION_WIN(winInfo) ((winInfo).sessionWin.win.skey > 0)
#define SET_SESSION_WIN_INVALID(winInfo) ((winInfo).sessionWin.win.skey = INT64_MIN)
#define IS_INVALID_SESSION_WIN_KEY(winKey) ((winKey).win.skey <= 0)
#define SET_SESSION_WIN_KEY_INVALID(pWinKey) ((pWinKey)->win.skey = INT64_MIN)
/**
* If the number of generated results is greater than this value,
* query query will be halt and return results to client immediate.
*/
typedef struct SResultInfo { // TODO refactor
int64_t totalRows; // total generated result size in rows
int64_t totalBytes; // total results in bytes.
int32_t capacity; // capacity of current result output buffer
int32_t threshold; // result size threshold in rows.
} SResultInfo;
typedef struct STableQueryInfo {
TSKEY lastKey; // last check ts, todo remove it later
SResultRowPosition pos; // current active time window
} STableQueryInfo;
typedef struct SLimit {
int64_t limit;
int64_t offset;
} SLimit;
typedef struct STableScanAnalyzeInfo SFileBlockLoadRecorder;
enum {
STREAM_RECOVER_STEP__NONE = 0,
STREAM_RECOVER_STEP__PREPARE1,
STREAM_RECOVER_STEP__PREPARE2,
STREAM_RECOVER_STEP__SCAN1,
STREAM_RECOVER_STEP__SCAN2,
};
extern int32_t exchangeObjRefPool;
typedef struct {
......@@ -29,9 +90,584 @@ typedef struct {
int32_t bytes;
} SGroupKeys, SStateKeys;
typedef struct {
char* tablename;
char* dbname;
int32_t tversion;
SSchemaWrapper* sw;
SSchemaWrapper* qsw;
} SSchemaInfo;
typedef struct SExchangeOpStopInfo {
int32_t operatorType;
int64_t refId;
} SExchangeOpStopInfo;
typedef struct SExprSupp {
SExprInfo* pExprInfo;
int32_t numOfExprs; // the number of scalar expression in group operator
SqlFunctionCtx* pCtx;
int32_t* rowEntryInfoOffset; // offset value for each row result cell info
SFilterInfo* pFilterInfo;
} SExprSupp;
typedef enum {
EX_SOURCE_DATA_NOT_READY = 0x1,
EX_SOURCE_DATA_READY = 0x2,
EX_SOURCE_DATA_EXHAUSTED = 0x3,
} EX_SOURCE_STATUS;
#define COL_MATCH_FROM_COL_ID 0x1
#define COL_MATCH_FROM_SLOT_ID 0x2
typedef struct SLoadRemoteDataInfo {
uint64_t totalSize; // total load bytes from remote
uint64_t totalRows; // total number of rows
uint64_t totalElapsed; // total elapsed time
} SLoadRemoteDataInfo;
typedef struct SLimitInfo {
SLimit limit;
SLimit slimit;
uint64_t currentGroupId;
int64_t remainGroupOffset;
int64_t numOfOutputGroups;
int64_t remainOffset;
int64_t numOfOutputRows;
} SLimitInfo;
typedef struct SExchangeInfo {
SArray* pSources;
SArray* pSourceDataInfo;
tsem_t ready;
void* pTransporter;
// SArray<SSDataBlock*>, result block list, used to keep the multi-block that
// passed by downstream operator
SArray* pResultBlockList;
SArray* pRecycledBlocks; // build a pool for small data block to avoid to repeatly create and then destroy.
SSDataBlock* pDummyBlock; // dummy block, not keep data
bool seqLoadData; // sequential load data or not, false by default
int32_t current;
SLoadRemoteDataInfo loadInfo;
uint64_t self;
SLimitInfo limitInfo;
int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo
} SExchangeInfo;
typedef struct SScanInfo {
int32_t numOfAsc;
int32_t numOfDesc;
} SScanInfo;
typedef struct SSampleExecInfo {
double sampleRatio; // data block sample ratio, 1 by default
uint32_t seed; // random seed value
} SSampleExecInfo;
enum {
TABLE_SCAN__TABLE_ORDER = 1,
TABLE_SCAN__BLOCK_ORDER = 2,
};
typedef struct SAggSupporter {
SSHashObj* pResultRowHashTable; // quick locate the window object for each result
char* keyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
int32_t currentPageId; // current write page id
} SAggSupporter;
typedef struct {
// if the upstream is an interval operator, the interval info is also kept here to get the time window to check if
// current data block needs to be loaded.
SInterval interval;
SAggSupporter* pAggSup;
SExprSupp* pExprSup; // expr supporter of aggregate operator
} SAggOptrPushDownInfo;
typedef struct STableMetaCacheInfo {
SLRUCache* pTableMetaEntryCache; // 100 by default
uint64_t metaFetch;
uint64_t cacheHit;
} STableMetaCacheInfo;
typedef struct STableScanBase {
STsdbReader* dataReader;
SFileBlockLoadRecorder readRecorder;
SQueryTableDataCond cond;
SAggOptrPushDownInfo pdInfo;
SColMatchInfo matchInfo;
SReadHandle readHandle;
SExprSupp pseudoSup;
STableMetaCacheInfo metaCache;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
SLimitInfo limitInfo;
// there are more than one table list exists in one task, if only one vnode exists.
STableListInfo* pTableListInfo;
} STableScanBase;
typedef struct STableScanInfo {
STableScanBase base;
SScanInfo scanInfo;
int32_t scanTimes;
SSDataBlock* pResBlock;
SSampleExecInfo sample; // sample execution info
int32_t currentGroupId;
int32_t currentTable;
int8_t scanMode;
int8_t assignBlockUid;
bool hasGroupByTag;
bool countOnly;
} STableScanInfo;
typedef struct STableMergeScanInfo {
int32_t tableStartIndex;
int32_t tableEndIndex;
bool hasGroupId;
uint64_t groupId;
SArray* queryConds; // array of queryTableDataCond
STableScanBase base;
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
SArray* pSortInfo;
SSortHandle* pSortHandle;
SSDataBlock* pSortInputBlock;
int64_t startTs; // sort start time
SArray* sortSourceParams;
SLimitInfo limitInfo;
int64_t numOfRows;
SScanInfo scanInfo;
int32_t scanTimes;
SSDataBlock* pResBlock;
SSampleExecInfo sample; // sample execution info
SSortExecInfo sortExecInfo;
} STableMergeScanInfo;
typedef struct STagScanInfo {
SColumnInfo* pCols;
SSDataBlock* pRes;
SColMatchInfo matchInfo;
int32_t curPos;
SReadHandle readHandle;
STableListInfo* pTableListInfo;
} STagScanInfo;
typedef enum EStreamScanMode {
STREAM_SCAN_FROM_READERHANDLE = 1,
STREAM_SCAN_FROM_RES,
STREAM_SCAN_FROM_UPDATERES,
STREAM_SCAN_FROM_DELETE_DATA,
STREAM_SCAN_FROM_DATAREADER_RETRIEVE,
STREAM_SCAN_FROM_DATAREADER_RANGE,
} EStreamScanMode;
enum {
PROJECT_RETRIEVE_CONTINUE = 0x1,
PROJECT_RETRIEVE_DONE = 0x2,
};
typedef struct SStreamAggSupporter {
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
SSDataBlock* pScanBlock;
SStreamState* pState;
int64_t gap; // stream session window gap
SqlFunctionCtx* pDummyCtx; // for combine
SSHashObj* pResultRows;
int32_t stateKeySize;
int16_t stateKeyType;
SDiskbasedBuf* pResultBuf;
} SStreamAggSupporter;
typedef struct SWindowSupporter {
SStreamAggSupporter* pStreamAggSup;
int64_t gap;
uint16_t parentType;
SAggSupporter* pIntervalAggSup;
} SWindowSupporter;
typedef struct SPartitionBySupporter {
SArray* pGroupCols; // group by columns, SArray<SColumn>
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
char* keyBuf; // group by keys for hash
bool needCalc; // partition by column
} SPartitionBySupporter;
typedef struct SPartitionDataInfo {
uint64_t groupId;
char* tbname;
SArray* tags;
SArray* rowIds;
} SPartitionDataInfo;
typedef struct STimeWindowAggSupp {
int8_t calTrigger;
int8_t calTriggerSaved;
int64_t deleteMark;
int64_t deleteMarkSaved;
int64_t waterMark;
TSKEY maxTs;
TSKEY minTs;
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} STimeWindowAggSupp;
typedef struct SStreamScanInfo {
SExprInfo* pPseudoExpr;
int32_t numOfPseudoExpr;
SExprSupp tbnameCalSup;
SExprSupp tagCalSup;
int32_t primaryTsIndex; // primary time stamp slot id
SReadHandle readHandle;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
SColMatchInfo matchInfo;
SArray* pBlockLists; // multiple SSDatablock.
SSDataBlock* pRes; // result SSDataBlock
SSDataBlock* pUpdateRes; // update SSDataBlock
int32_t updateResIndex;
int32_t blockType; // current block type
int32_t validBlockIndex; // Is current data has returned?
uint64_t numOfExec; // execution times
STqReader* tqReader;
uint64_t groupId;
SUpdateInfo* pUpdateInfo;
EStreamScanMode scanMode;
struct SOperatorInfo* pStreamScanOp;
struct SOperatorInfo* pTableScanOp;
SArray* childIds;
SWindowSupporter windowSup;
SPartitionBySupporter partitionSup;
SExprSupp* pPartScalarSup;
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
int32_t scanWinIndex; // for state operator
int32_t pullDataResIndex;
SSDataBlock* pPullDataRes; // pull data SSDataBlock
SSDataBlock* pDeleteDataRes; // delete data SSDataBlock
int32_t deleteDataIndex;
STimeWindow updateWin;
STimeWindowAggSupp twAggSup;
SSDataBlock* pUpdateDataRes;
// status for tmq
SNodeList* pGroupTags;
SNode* pTagCond;
SNode* pTagIndexCond;
// recover
int32_t blockRecoverContiCnt;
int32_t blockRecoverTotCnt;
SSDataBlock* pRecoverRes;
SSDataBlock* pCreateTbRes;
int8_t igCheckUpdate;
int8_t igExpired;
} SStreamScanInfo;
typedef struct {
SVnode* vnode;
SSDataBlock pRes; // result SSDataBlock
STsdbReader* dataReader;
SSnapContext* sContext;
STableListInfo* pTableListInfo;
} SStreamRawScanInfo;
typedef struct STableCountScanSupp {
int16_t dbNameSlotId;
int16_t stbNameSlotId;
int16_t tbCountSlotId;
bool groupByDbName;
bool groupByStbName;
char dbNameFilter[TSDB_DB_NAME_LEN];
char stbNameFilter[TSDB_TABLE_NAME_LEN];
} STableCountScanSupp;
typedef struct SOptrBasicInfo {
SResultRowInfo resultRowInfo;
SSDataBlock* pRes;
bool mergeResultBlock;
} SOptrBasicInfo;
typedef struct SIntervalAggOperatorInfo {
SOptrBasicInfo binfo; // basic info
SAggSupporter aggSup; // aggregate supporter
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
STimeWindow win; // query time range
bool timeWindowInterpo; // interpolation needed or not
SArray* pInterpCols; // interpolation columns
int32_t resultTsOrder; // result timestamp order
int32_t inputOrder; // input data ts order
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
STimeWindowAggSupp twAggSup;
SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
} SIntervalAggOperatorInfo;
typedef struct SMergeAlignedIntervalAggOperatorInfo {
SIntervalAggOperatorInfo* intervalAggOperatorInfo;
uint64_t groupId; // current groupId
int64_t curTs; // current ts
SSDataBlock* prefetchedBlock;
SResultRow* pResultRow;
} SMergeAlignedIntervalAggOperatorInfo;
typedef struct SStreamIntervalOperatorInfo {
SOptrBasicInfo binfo; // basic info
SAggSupporter aggSup; // aggregate supporter
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
STimeWindowAggSupp twAggSup;
bool invertible;
bool ignoreExpiredData;
bool ignoreExpiredDataSaved;
SArray* pDelWins; // SWinRes
int32_t delIndex;
SSDataBlock* pDelRes;
SPhysiNode* pPhyNode; // create new child
SHashObj* pPullDataMap;
SArray* pPullWins; // SPullWindowInfo
int32_t pullIndex;
SSDataBlock* pPullDataRes;
bool isFinal;
SArray* pChildren;
SStreamState* pState;
SWinKey delKey;
uint64_t numOfDatapack;
SArray* pUpdated;
SSHashObj* pUpdatedMap;
int64_t dataVersion;
} SStreamIntervalOperatorInfo;
typedef struct SDataGroupInfo {
uint64_t groupId;
int64_t numOfRows;
SArray* pPageList;
} SDataGroupInfo;
typedef struct SWindowRowsSup {
STimeWindow win;
TSKEY prevTs;
int32_t startRowIndex;
int32_t numOfRows;
uint64_t groupId;
} SWindowRowsSup;
typedef struct SResultWindowInfo {
void* pOutputBuf;
SSessionKey sessionWin;
bool isOutput;
} SResultWindowInfo;
typedef struct SStreamSessionAggOperatorInfo {
SOptrBasicInfo binfo;
SStreamAggSupporter streamAggSup;
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo;
int32_t primaryTsIndex; // primary timestamp slot id
int32_t endTsIndex; // window end timestamp slot id
int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup;
SSDataBlock* pWinBlock; // window result
SSDataBlock* pDelRes; // delete result
SSDataBlock* pUpdateRes; // update window
bool returnUpdate;
SSHashObj* pStDeleted;
void* pDelIterator;
SArray* pChildren; // cache for children's result; final stream operator
SPhysiNode* pPhyNode; // create new child
bool isFinal;
bool ignoreExpiredData;
bool ignoreExpiredDataSaved;
SArray* pUpdated;
SSHashObj* pStUpdated;
int64_t dataVersion;
} SStreamSessionAggOperatorInfo;
typedef struct SStreamStateAggOperatorInfo {
SOptrBasicInfo binfo;
SStreamAggSupporter streamAggSup;
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo;
int32_t primaryTsIndex; // primary timestamp slot id
STimeWindowAggSupp twAggSup;
SColumn stateCol;
SSDataBlock* pDelRes;
SSHashObj* pSeDeleted;
void* pDelIterator;
SArray* pChildren; // cache for children's result;
bool ignoreExpiredData;
bool ignoreExpiredDataSaved;
SArray* pUpdated;
SSHashObj* pSeUpdated;
int64_t dataVersion;
} SStreamStateAggOperatorInfo;
typedef struct SStreamPartitionOperatorInfo {
SOptrBasicInfo binfo;
SPartitionBySupporter partitionSup;
SExprSupp scalarSup;
SExprSupp tbnameCalSup;
SExprSupp tagCalSup;
SHashObj* pPartitions;
void* parIte;
void* pTbNameIte;
SSDataBlock* pInputDataBlock;
int32_t tsColIndex;
SSDataBlock* pDelRes;
SSDataBlock* pCreateTbRes;
} SStreamPartitionOperatorInfo;
typedef struct SStreamFillSupporter {
int32_t type; // fill type
SInterval interval;
SResultRowData prev;
SResultRowData cur;
SResultRowData next;
SResultRowData nextNext;
SFillColInfo* pAllColInfo; // fill exprs and not fill exprs
SExprSupp notFillExprSup;
int32_t numOfAllCols; // number of all exprs, including the tags columns
int32_t numOfFillCols;
int32_t numOfNotFillCols;
int32_t rowSize;
SSHashObj* pResMap;
bool hasDelete;
} SStreamFillSupporter;
typedef struct SStreamFillOperatorInfo {
SStreamFillSupporter* pFillSup;
SSDataBlock* pRes;
SSDataBlock* pSrcBlock;
int32_t srcRowIndex;
SSDataBlock* pSrcDelBlock;
int32_t srcDelRowIndex;
SSDataBlock* pDelRes;
SColMatchInfo matchInfo;
int32_t primaryTsCol;
int32_t primarySrcSlotId;
SStreamFillInfo* pFillInfo;
} SStreamFillOperatorInfo;
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, const char* dbName, SExecTaskInfo* pTaskInfo);
void cleanupQueriedTableScanInfo(SSchemaInfo* pSchemaInfo);
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
void cleanupBasicInfo(SOptrBasicInfo* pInfo);
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
void cleanupExprSupp(SExprSupp* pSup);
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs);
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey, void* pState);
void cleanupAggSup(SAggSupporter* pAggSup);
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
void doBuildStreamResBlock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf);
void doBuildResultDatablock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf);
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo);
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo);
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart);
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
struct SOperatorInfo* pOperator);
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
extern void doDestroyExchangeOperatorInfo(void* param);
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo);
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
int32_t rows, const char* idStr, STableMetaCacheInfo* pCache);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name);
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset);
void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput);
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
bool isIntervalQuery, SAggSupporter* pSup, bool keepGroup);
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
int32_t numOfOutput, SArray* pPseudoList);
void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol);
int32_t checkForQueryBuf(size_t numOfTables);
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* pTask, SReadHandle* readHandle);
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
int32_t order);
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize);
void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, SSessionKey* pKey);
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup);
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
uint64_t* pGp, void* pTbName);
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
bool groupbyTbname(SNodeList* pGroupList);
int32_t buildDataBlockFromGroupRes(struct SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo);
int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size);
int32_t buildSessionResultDataBlock(struct SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
SExprSupp* pSup, SGroupResInfo* pGroupResInfo);
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup);
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult);
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order);
int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order,
int64_t* pData);
void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId,
SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock);
SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag);
SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs);
void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo);
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset);
void doClearBufferedBlocks(SStreamScanInfo* pInfo);
uint64_t calcGroupId(char* pData, int32_t len);
#ifdef __cplusplus
}
#endif
#endif /*_TD_EXECUTOR_INT_H*/
\ No newline at end of file
#endif // TDENGINE_EXECUTORINT_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_EXECUTORIMPL_H
#define TDENGINE_EXECUTORIMPL_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#include "tcommon.h"
#include "tlosertree.h"
#include "tsort.h"
#include "ttszip.h"
#include "tvariant.h"
#include "dataSinkMgt.h"
#include "executil.h"
#include "executor.h"
#include "planner.h"
#include "scalar.h"
#include "taosdef.h"
#include "tarray.h"
#include "tfill.h"
#include "thash.h"
#include "tlockfree.h"
#include "tmsg.h"
#include "tpagedbuf.h"
#include "tstream.h"
#include "tstreamUpdate.h"
#include "executorInt.h"
#include "vnode.h"
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
#define IS_VALID_SESSION_WIN(winInfo) ((winInfo).sessionWin.win.skey > 0)
#define SET_SESSION_WIN_INVALID(winInfo) ((winInfo).sessionWin.win.skey = INT64_MIN)
#define IS_INVALID_SESSION_WIN_KEY(winKey) ((winKey).win.skey <= 0)
#define SET_SESSION_WIN_KEY_INVALID(pWinKey) ((pWinKey)->win.skey = INT64_MIN)
enum {
// when this task starts to execute, this status will set
TASK_NOT_COMPLETED = 0x1u,
/* Task is over
* 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc.
* 2. when all data within queried time window, it is also denoted as query_completed
*/
TASK_COMPLETED = 0x2u,
};
/**
* If the number of generated results is greater than this value,
* query query will be halt and return results to client immediate.
*/
typedef struct SResultInfo { // TODO refactor
int64_t totalRows; // total generated result size in rows
int64_t totalBytes; // total results in bytes.
int32_t capacity; // capacity of current result output buffer
int32_t threshold; // result size threshold in rows.
} SResultInfo;
typedef struct STableQueryInfo {
TSKEY lastKey; // last check ts, todo remove it later
SResultRowPosition pos; // current active time window
} STableQueryInfo;
typedef struct SLimit {
int64_t limit;
int64_t offset;
} SLimit;
typedef struct STableScanAnalyzeInfo SFileBlockLoadRecorder;
enum {
STREAM_RECOVER_STEP__NONE = 0,
STREAM_RECOVER_STEP__PREPARE1,
STREAM_RECOVER_STEP__PREPARE2,
STREAM_RECOVER_STEP__SCAN1,
STREAM_RECOVER_STEP__SCAN2,
};
typedef struct {
STqOffsetVal currentOffset; // for tmq
SMqMetaRsp metaRsp; // for tmq fetching meta
int64_t snapshotVer;
SPackedData submit;
SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN];
int8_t recoverStep;
int8_t recoverScanFinished;
SQueryTableDataCond tableCond;
int64_t fillHistoryVer1;
int64_t fillHistoryVer2;
SStreamState* pState;
int64_t dataVersion;
int64_t checkPointId;
} SStreamTaskInfo;
typedef struct {
char* tablename;
char* dbname;
int32_t tversion;
SSchemaWrapper* sw;
SSchemaWrapper* qsw;
} SSchemaInfo;
typedef struct SExchangeOpStopInfo {
int32_t operatorType;
int64_t refId;
} SExchangeOpStopInfo;
typedef struct SExprSupp {
SExprInfo* pExprInfo;
int32_t numOfExprs; // the number of scalar expression in group operator
SqlFunctionCtx* pCtx;
int32_t* rowEntryInfoOffset; // offset value for each row result cell info
SFilterInfo* pFilterInfo;
} SExprSupp;
typedef enum {
EX_SOURCE_DATA_NOT_READY = 0x1,
EX_SOURCE_DATA_READY = 0x2,
EX_SOURCE_DATA_EXHAUSTED = 0x3,
} EX_SOURCE_STATUS;
#define COL_MATCH_FROM_COL_ID 0x1
#define COL_MATCH_FROM_SLOT_ID 0x2
typedef struct SLoadRemoteDataInfo {
uint64_t totalSize; // total load bytes from remote
uint64_t totalRows; // total number of rows
uint64_t totalElapsed; // total elapsed time
} SLoadRemoteDataInfo;
typedef struct SLimitInfo {
SLimit limit;
SLimit slimit;
uint64_t currentGroupId;
int64_t remainGroupOffset;
int64_t numOfOutputGroups;
int64_t remainOffset;
int64_t numOfOutputRows;
} SLimitInfo;
typedef struct SExchangeInfo {
SArray* pSources;
SArray* pSourceDataInfo;
tsem_t ready;
void* pTransporter;
// SArray<SSDataBlock*>, result block list, used to keep the multi-block that
// passed by downstream operator
SArray* pResultBlockList;
SArray* pRecycledBlocks; // build a pool for small data block to avoid to repeatly create and then destroy.
SSDataBlock* pDummyBlock; // dummy block, not keep data
bool seqLoadData; // sequential load data or not, false by default
int32_t current;
SLoadRemoteDataInfo loadInfo;
uint64_t self;
SLimitInfo limitInfo;
int64_t openedTs; // start exec time stamp, todo: move to SLoadRemoteDataInfo
} SExchangeInfo;
typedef struct SScanInfo {
int32_t numOfAsc;
int32_t numOfDesc;
} SScanInfo;
typedef struct SSampleExecInfo {
double sampleRatio; // data block sample ratio, 1 by default
uint32_t seed; // random seed value
} SSampleExecInfo;
enum {
TABLE_SCAN__TABLE_ORDER = 1,
TABLE_SCAN__BLOCK_ORDER = 2,
};
typedef struct SAggSupporter {
SSHashObj* pResultRowHashTable; // quick locate the window object for each result
char* keyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
int32_t currentPageId; // current write page id
} SAggSupporter;
typedef struct {
// if the upstream is an interval operator, the interval info is also kept here to get the time window to check if
// current data block needs to be loaded.
SInterval interval;
SAggSupporter* pAggSup;
SExprSupp* pExprSup; // expr supporter of aggregate operator
} SAggOptrPushDownInfo;
typedef struct STableMetaCacheInfo {
SLRUCache* pTableMetaEntryCache; // 100 by default
uint64_t metaFetch;
uint64_t cacheHit;
} STableMetaCacheInfo;
typedef struct STableScanBase {
STsdbReader* dataReader;
SFileBlockLoadRecorder readRecorder;
SQueryTableDataCond cond;
SAggOptrPushDownInfo pdInfo;
SColMatchInfo matchInfo;
SReadHandle readHandle;
SExprSupp pseudoSup;
STableMetaCacheInfo metaCache;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
SLimitInfo limitInfo;
// there are more than one table list exists in one task, if only one vnode exists.
STableListInfo* pTableListInfo;
} STableScanBase;
typedef struct STableScanInfo {
STableScanBase base;
SScanInfo scanInfo;
int32_t scanTimes;
SSDataBlock* pResBlock;
SSampleExecInfo sample; // sample execution info
int32_t currentGroupId;
int32_t currentTable;
int8_t scanMode;
int8_t assignBlockUid;
bool hasGroupByTag;
bool countOnly;
} STableScanInfo;
typedef struct STableMergeScanInfo {
int32_t tableStartIndex;
int32_t tableEndIndex;
bool hasGroupId;
uint64_t groupId;
SArray* queryConds; // array of queryTableDataCond
STableScanBase base;
int32_t bufPageSize;
uint32_t sortBufSize; // max buffer size for in-memory sort
SArray* pSortInfo;
SSortHandle* pSortHandle;
SSDataBlock* pSortInputBlock;
int64_t startTs; // sort start time
SArray* sortSourceParams;
SLimitInfo limitInfo;
int64_t numOfRows;
SScanInfo scanInfo;
int32_t scanTimes;
SSDataBlock* pResBlock;
SSampleExecInfo sample; // sample execution info
SSortExecInfo sortExecInfo;
} STableMergeScanInfo;
typedef struct STagScanInfo {
SColumnInfo* pCols;
SSDataBlock* pRes;
SColMatchInfo matchInfo;
int32_t curPos;
SReadHandle readHandle;
STableListInfo* pTableListInfo;
} STagScanInfo;
typedef enum EStreamScanMode {
STREAM_SCAN_FROM_READERHANDLE = 1,
STREAM_SCAN_FROM_RES,
STREAM_SCAN_FROM_UPDATERES,
STREAM_SCAN_FROM_DELETE_DATA,
STREAM_SCAN_FROM_DATAREADER_RETRIEVE,
STREAM_SCAN_FROM_DATAREADER_RANGE,
} EStreamScanMode;
enum {
PROJECT_RETRIEVE_CONTINUE = 0x1,
PROJECT_RETRIEVE_DONE = 0x2,
};
typedef struct SStreamAggSupporter {
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
SSDataBlock* pScanBlock;
SStreamState* pState;
int64_t gap; // stream session window gap
SqlFunctionCtx* pDummyCtx; // for combine
SSHashObj* pResultRows;
int32_t stateKeySize;
int16_t stateKeyType;
SDiskbasedBuf* pResultBuf;
} SStreamAggSupporter;
typedef struct SWindowSupporter {
SStreamAggSupporter* pStreamAggSup;
int64_t gap;
uint16_t parentType;
SAggSupporter* pIntervalAggSup;
} SWindowSupporter;
typedef struct SPartitionBySupporter {
SArray* pGroupCols; // group by columns, SArray<SColumn>
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
char* keyBuf; // group by keys for hash
bool needCalc; // partition by column
} SPartitionBySupporter;
typedef struct SPartitionDataInfo {
uint64_t groupId;
char* tbname;
SArray* tags;
SArray* rowIds;
} SPartitionDataInfo;
typedef struct STimeWindowAggSupp {
int8_t calTrigger;
int8_t calTriggerSaved;
int64_t deleteMark;
int64_t deleteMarkSaved;
int64_t waterMark;
TSKEY maxTs;
TSKEY minTs;
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} STimeWindowAggSupp;
typedef struct SStreamScanInfo {
SExprInfo* pPseudoExpr;
int32_t numOfPseudoExpr;
SExprSupp tbnameCalSup;
SExprSupp tagCalSup;
int32_t primaryTsIndex; // primary time stamp slot id
SReadHandle readHandle;
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
SColMatchInfo matchInfo;
SArray* pBlockLists; // multiple SSDatablock.
SSDataBlock* pRes; // result SSDataBlock
SSDataBlock* pUpdateRes; // update SSDataBlock
int32_t updateResIndex;
int32_t blockType; // current block type
int32_t validBlockIndex; // Is current data has returned?
uint64_t numOfExec; // execution times
STqReader* tqReader;
uint64_t groupId;
SUpdateInfo* pUpdateInfo;
EStreamScanMode scanMode;
struct SOperatorInfo* pStreamScanOp;
struct SOperatorInfo* pTableScanOp;
SArray* childIds;
SWindowSupporter windowSup;
SPartitionBySupporter partitionSup;
SExprSupp* pPartScalarSup;
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
int32_t scanWinIndex; // for state operator
int32_t pullDataResIndex;
SSDataBlock* pPullDataRes; // pull data SSDataBlock
SSDataBlock* pDeleteDataRes; // delete data SSDataBlock
int32_t deleteDataIndex;
STimeWindow updateWin;
STimeWindowAggSupp twAggSup;
SSDataBlock* pUpdateDataRes;
// status for tmq
SNodeList* pGroupTags;
SNode* pTagCond;
SNode* pTagIndexCond;
// recover
int32_t blockRecoverContiCnt;
int32_t blockRecoverTotCnt;
SSDataBlock* pRecoverRes;
SSDataBlock* pCreateTbRes;
int8_t igCheckUpdate;
int8_t igExpired;
} SStreamScanInfo;
typedef struct {
SVnode* vnode;
SSDataBlock pRes; // result SSDataBlock
STsdbReader* dataReader;
SSnapContext* sContext;
STableListInfo* pTableListInfo;
} SStreamRawScanInfo;
typedef struct STableCountScanSupp {
int16_t dbNameSlotId;
int16_t stbNameSlotId;
int16_t tbCountSlotId;
bool groupByDbName;
bool groupByStbName;
char dbNameFilter[TSDB_DB_NAME_LEN];
char stbNameFilter[TSDB_TABLE_NAME_LEN];
} STableCountScanSupp;
typedef struct SOptrBasicInfo {
SResultRowInfo resultRowInfo;
SSDataBlock* pRes;
bool mergeResultBlock;
} SOptrBasicInfo;
typedef struct SIntervalAggOperatorInfo {
SOptrBasicInfo binfo; // basic info
SAggSupporter aggSup; // aggregate supporter
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
STimeWindow win; // query time range
bool timeWindowInterpo; // interpolation needed or not
SArray* pInterpCols; // interpolation columns
int32_t resultTsOrder; // result timestamp order
int32_t inputOrder; // input data ts order
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
STimeWindowAggSupp twAggSup;
SArray* pPrevValues; // SArray<SGroupKeys> used to keep the previous not null value for interpolation.
} SIntervalAggOperatorInfo;
typedef struct SMergeAlignedIntervalAggOperatorInfo {
SIntervalAggOperatorInfo* intervalAggOperatorInfo;
uint64_t groupId; // current groupId
int64_t curTs; // current ts
SSDataBlock* prefetchedBlock;
SResultRow* pResultRow;
} SMergeAlignedIntervalAggOperatorInfo;
typedef struct SStreamIntervalOperatorInfo {
SOptrBasicInfo binfo; // basic info
SAggSupporter aggSup; // aggregate supporter
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
STimeWindowAggSupp twAggSup;
bool invertible;
bool ignoreExpiredData;
bool ignoreExpiredDataSaved;
SArray* pDelWins; // SWinRes
int32_t delIndex;
SSDataBlock* pDelRes;
SPhysiNode* pPhyNode; // create new child
SHashObj* pPullDataMap;
SArray* pPullWins; // SPullWindowInfo
int32_t pullIndex;
SSDataBlock* pPullDataRes;
bool isFinal;
SArray* pChildren;
SStreamState* pState;
SWinKey delKey;
uint64_t numOfDatapack;
SArray* pUpdated;
SSHashObj* pUpdatedMap;
int64_t dataVersion;
} SStreamIntervalOperatorInfo;
typedef struct SDataGroupInfo {
uint64_t groupId;
int64_t numOfRows;
SArray* pPageList;
} SDataGroupInfo;
typedef struct SWindowRowsSup {
STimeWindow win;
TSKEY prevTs;
int32_t startRowIndex;
int32_t numOfRows;
uint64_t groupId;
} SWindowRowsSup;
typedef struct SResultWindowInfo {
void* pOutputBuf;
SSessionKey sessionWin;
bool isOutput;
} SResultWindowInfo;
typedef struct SStateWindowInfo {
SResultWindowInfo winInfo;
SStateKeys* pStateKey;
} SStateWindowInfo;
typedef struct SStreamSessionAggOperatorInfo {
SOptrBasicInfo binfo;
SStreamAggSupporter streamAggSup;
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo;
int32_t primaryTsIndex; // primary timestamp slot id
int32_t endTsIndex; // window end timestamp slot id
int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup;
SSDataBlock* pWinBlock; // window result
SSDataBlock* pDelRes; // delete result
SSDataBlock* pUpdateRes; // update window
bool returnUpdate;
SSHashObj* pStDeleted;
void* pDelIterator;
SArray* pChildren; // cache for children's result; final stream operator
SPhysiNode* pPhyNode; // create new child
bool isFinal;
bool ignoreExpiredData;
bool ignoreExpiredDataSaved;
SArray* pUpdated;
SSHashObj* pStUpdated;
int64_t dataVersion;
} SStreamSessionAggOperatorInfo;
typedef struct SStreamStateAggOperatorInfo {
SOptrBasicInfo binfo;
SStreamAggSupporter streamAggSup;
SExprSupp scalarSupp; // supporter for perform scalar function
SGroupResInfo groupResInfo;
int32_t primaryTsIndex; // primary timestamp slot id
STimeWindowAggSupp twAggSup;
SColumn stateCol;
SSDataBlock* pDelRes;
SSHashObj* pSeDeleted;
void* pDelIterator;
SArray* pChildren; // cache for children's result;
bool ignoreExpiredData;
bool ignoreExpiredDataSaved;
SArray* pUpdated;
SSHashObj* pSeUpdated;
int64_t dataVersion;
} SStreamStateAggOperatorInfo;
typedef struct SStreamPartitionOperatorInfo {
SOptrBasicInfo binfo;
SPartitionBySupporter partitionSup;
SExprSupp scalarSup;
SExprSupp tbnameCalSup;
SExprSupp tagCalSup;
SHashObj* pPartitions;
void* parIte;
void* pTbNameIte;
SSDataBlock* pInputDataBlock;
int32_t tsColIndex;
SSDataBlock* pDelRes;
SSDataBlock* pCreateTbRes;
} SStreamPartitionOperatorInfo;
typedef struct SStreamFillSupporter {
int32_t type; // fill type
SInterval interval;
SResultRowData prev;
SResultRowData cur;
SResultRowData next;
SResultRowData nextNext;
SFillColInfo* pAllColInfo; // fill exprs and not fill exprs
SExprSupp notFillExprSup;
int32_t numOfAllCols; // number of all exprs, including the tags columns
int32_t numOfFillCols;
int32_t numOfNotFillCols;
int32_t rowSize;
SSHashObj* pResMap;
bool hasDelete;
} SStreamFillSupporter;
typedef struct SStreamFillOperatorInfo {
SStreamFillSupporter* pFillSup;
SSDataBlock* pRes;
SSDataBlock* pSrcBlock;
int32_t srcRowIndex;
SSDataBlock* pSrcDelBlock;
int32_t srcDelRowIndex;
SSDataBlock* pDelRes;
SColMatchInfo matchInfo;
int32_t primaryTsCol;
int32_t primarySrcSlotId;
SStreamFillInfo* pFillInfo;
} SStreamFillOperatorInfo;
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, const char* dbName, SExecTaskInfo* pTaskInfo);
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
void cleanupBasicInfo(SOptrBasicInfo* pInfo);
int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr);
void cleanupExprSupp(SExprSupp* pSup);
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs);
int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
const char* pkey, void* pState);
void cleanupAggSup(SAggSupporter* pAggSup);
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
void doBuildStreamResBlock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf);
void doBuildResultDatablock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf);
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo);
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
void resetLimitInfoForNextGroup(SLimitInfo* pLimitInfo);
bool applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData,
int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart);
void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int32_t dataLen, int64_t startTs,
struct SOperatorInfo* pOperator);
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
extern void doDestroyExchangeOperatorInfo(void* param);
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo);
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
int32_t rows, const char* idStr, STableMetaCacheInfo* pCache);
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name);
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset);
void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput);
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData,
int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo,
bool isIntervalQuery, SAggSupporter* pSup, bool keepGroup);
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
int32_t numOfOutput, SArray* pPseudoList);
void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol);
int32_t checkForQueryBuf(size_t numOfTables);
SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo);
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo* pTask, SReadHandle* readHandle);
int32_t getOperatorExplainExecInfo(struct SOperatorInfo* operatorInfo, SArray* pExecInfoList);
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
int32_t order);
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize);
void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, SSessionKey* pKey);
bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup);
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup);
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
uint64_t* pGp, void* pTbName);
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
bool groupbyTbname(SNodeList* pGroupList);
int32_t buildDataBlockFromGroupRes(struct SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo);
int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size);
int32_t buildSessionResultDataBlock(struct SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock,
SExprSupp* pSup, SGroupResInfo* pGroupResInfo);
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup);
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult);
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order);
int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, int32_t order,
int64_t* pData);
void appendCreateTableRow(SStreamState* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId,
SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock);
SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag);
SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs);
void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx,
SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo);
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset);
void doClearBufferedBlocks(SStreamScanInfo* pInfo);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_EXECUTORIMPL_H
......@@ -157,6 +157,7 @@ void destroyOperator(SOperatorInfo* pOperator);
SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id);
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder);
int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdStr);
int32_t getOperatorExplainExecInfo(struct SOperatorInfo* operatorInfo, SArray* pExecInfoList);
#ifdef __cplusplus
}
......
......@@ -22,6 +22,17 @@ extern "C" {
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str)
enum {
// when this task starts to execute, this status will set
TASK_NOT_COMPLETED = 0x1u,
/* Task is over
* 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc.
* 2. when all data within queried time window, it is also denoted as query_completed
*/
TASK_COMPLETED = 0x2u,
};
typedef struct STaskIdInfo {
uint64_t queryId; // this is also a request id
uint64_t subplanId;
......@@ -44,6 +55,23 @@ typedef struct STaskStopInfo {
SArray* pStopInfo;
} STaskStopInfo;
typedef struct {
STqOffsetVal currentOffset; // for tmq
SMqMetaRsp metaRsp; // for tmq fetching meta
int64_t snapshotVer;
SPackedData submit; // todo remove it
SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor
int8_t recoverStep;
int8_t recoverScanFinished;
SQueryTableDataCond tableCond;
int64_t fillHistoryVer1;
int64_t fillHistoryVer2;
SStreamState* pState;
int64_t dataVersion;
int64_t checkPointId;
} SStreamTaskInfo;
struct SExecTaskInfo {
STaskIdInfo id;
uint32_t status;
......@@ -75,6 +103,7 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
int32_t vgId, char* sql, EOPTR_EXEC_MODEL model);
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo);
SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo);
#ifdef __cplusplus
}
......
......@@ -20,16 +20,16 @@
#include "tfill.h"
#include "tname.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "executorimpl.h"
#include "executorInt.h"
#include "index.h"
#include "operator.h"
#include "query.h"
#include "querytask.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "thash.h"
#include "ttypes.h"
#include "operator.h"
#include "querytask.h"
typedef struct {
bool hasAgg;
......
......@@ -20,12 +20,12 @@
#include "tdatablock.h"
#include "tmsg.h"
#include "executorimpl.h"
#include "executorInt.h"
#include "operator.h"
#include "querytask.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
#include "operator.h"
#include "querytask.h"
typedef struct SCacheRowsScanInfo {
SSDataBlock* pRes;
......
......@@ -15,7 +15,7 @@
#include "dataSinkInt.h"
#include "dataSinkMgt.h"
#include "executorimpl.h"
#include "executorInt.h"
#include "planner.h"
#include "tcompression.h"
#include "tdatablock.h"
......
......@@ -15,7 +15,7 @@
#include "dataSinkInt.h"
#include "dataSinkMgt.h"
#include "executorimpl.h"
#include "executorInt.h"
#include "planner.h"
#include "tcompression.h"
#include "tdatablock.h"
......
......@@ -15,7 +15,7 @@
#include "dataSinkInt.h"
#include "dataSinkMgt.h"
#include "executorimpl.h"
#include "executorInt.h"
#include "planner.h"
#include "tcompression.h"
#include "tdatablock.h"
......
......@@ -13,16 +13,16 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorimpl.h"
#include "executorInt.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
#include "operator.h"
#include "querytask.h"
#include "tcommon.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "ttime.h"
#include "operator.h"
#include "querytask.h"
typedef struct SEventWindowOperatorInfo {
SOptrBasicInfo binfo;
......
......@@ -13,19 +13,19 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorInt.h"
#include "filter.h"
#include "function.h"
#include "os.h"
#include "tname.h"
#include "tref.h"
#include "tdatablock.h"
#include "tmsg.h"
#include "executorimpl.h"
#include "index.h"
#include "query.h"
#include "thash.h"
#include "operator.h"
#include "os.h"
#include "query.h"
#include "querytask.h"
#include "tdatablock.h"
#include "thash.h"
#include "tmsg.h"
#include "tname.h"
#include "tref.h"
typedef struct SFetchRspHandleWrapper {
uint32_t exchangeId;
......
......@@ -24,9 +24,9 @@
#include "ttime.h"
#include "executil.h"
#include "executorimpl.h"
#include "tcompression.h"
#include "executorInt.h"
#include "querytask.h"
#include "tcompression.h"
typedef struct STableListIdInfo {
uint64_t suid;
......
......@@ -14,14 +14,14 @@
*/
#include "executor.h"
#include "executorimpl.h"
#include "executorInt.h"
#include "operator.h"
#include "planner.h"
#include "querytask.h"
#include "tdatablock.h"
#include "tref.h"
#include "tudf.h"
#include "vnode.h"
#include "operator.h"
#include "querytask.h"
static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
int32_t exchangeObjRefPool = -1;
......@@ -718,8 +718,6 @@ void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
taosArrayRemove(pTaskInfo->stopInfo.pStopInfo, idx);
}
taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
return;
}
void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
......@@ -1304,3 +1302,25 @@ SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
taosArrayDestroy(plist);
return pUidList;
}
static void extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pScanInfo = pOperator->info;
STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
taosArrayPush(pList, &pTableScanInfo->base.pTableListInfo);
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pScanInfo = pOperator->info;
taosArrayPush(pList, &pScanInfo->base.pTableListInfo);
} else {
if (pOperator->pDownstream != NULL && pOperator->pDownstream[0] != NULL) {
extractTableList(pList, pOperator->pDownstream[0]);
}
}
}
SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo) {
SArray* pArray = taosArrayInit(0, POINTER_BYTES);
SOperatorInfo* pOperator = pTaskInfo->pRoot;
extractTableList(pArray, pOperator);
return pArray;
}
\ No newline at end of file
......@@ -25,15 +25,15 @@
#include "tmsg.h"
#include "ttime.h"
#include "executorimpl.h"
#include "executorInt.h"
#include "index.h"
#include "operator.h"
#include "query.h"
#include "querytask.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
#include "vnode.h"
#include "operator.h"
#include "querytask.h"
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
......@@ -1059,37 +1059,6 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo*
return TSDB_CODE_SUCCESS;
}
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
SExplainExecInfo execInfo = {0};
SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);
pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows;
pExplainInfo->startupCost = operatorInfo->cost.openCost;
pExplainInfo->totalCost = operatorInfo->cost.totalCost;
pExplainInfo->verboseLen = 0;
pExplainInfo->verboseInfo = NULL;
if (operatorInfo->fpSet.getExplainFn) {
int32_t code =
operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
if (code) {
qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
return code;
}
}
int32_t code = 0;
for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList);
if (code != TSDB_CODE_SUCCESS) {
// taosMemoryFreeClear(*pRes);
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId,
SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) {
SWinKey key = {
......@@ -1331,25 +1300,3 @@ void qStreamCloseTsdbReader(void* task) {
}
}
}
static void extractTableList(SArray* pList, const SOperatorInfo* pOperator) {
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pScanInfo = pOperator->info;
STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
taosArrayPush(pList, &pTableScanInfo->base.pTableListInfo);
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
STableScanInfo* pScanInfo = pOperator->info;
taosArrayPush(pList, &pScanInfo->base.pTableListInfo);
} else {
if (pOperator->pDownstream != NULL && pOperator->pDownstream[0] != NULL) {
extractTableList(pList, pOperator->pDownstream[0]);
}
}
}
SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo) {
SArray* pArray = taosArrayInit(0, POINTER_BYTES);
SOperatorInfo* pOperator = pTaskInfo->pRoot;
extractTableList(pArray, pOperator);
return pArray;
}
\ No newline at end of file
......@@ -20,7 +20,7 @@
#include "tmsg.h"
#include "ttypes.h"
#include "executorimpl.h"
#include "executorInt.h"
#include "tcommon.h"
#include "thash.h"
#include "ttime.h"
......
......@@ -22,12 +22,11 @@
#include "tmsg.h"
#include "executorInt.h"
#include "executorimpl.h"
#include "operator.h"
#include "querytask.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
#include "operator.h"
#include "querytask.h"
typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo;
......
......@@ -13,18 +13,18 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorInt.h"
#include "filter.h"
#include "executorimpl.h"
#include "function.h"
#include "operator.h"
#include "os.h"
#include "querynodes.h"
#include "querytask.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "thash.h"
#include "tmsg.h"
#include "ttypes.h"
#include "operator.h"
#include "querytask.h"
typedef struct SJoinRowCtx {
bool rowRemains;
......
......@@ -20,12 +20,12 @@
#include "tglobal.h"
#include "executorimpl.h"
#include "executorInt.h"
#include "index.h"
#include "query.h"
#include "vnode.h"
#include "operator.h"
#include "query.h"
#include "querytask.h"
#include "vnode.h"
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
__optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn,
......@@ -75,28 +75,6 @@ void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, b
pOperator->pTaskInfo = pTaskInfo;
}
void destroyOperator(SOperatorInfo* pOperator) {
if (pOperator == NULL) {
return;
}
if (pOperator->fpSet.closeFn != NULL) {
pOperator->fpSet.closeFn(pOperator->info);
}
if (pOperator->pDownstream != NULL) {
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
destroyOperator(pOperator->pDownstream[i]);
}
taosMemoryFreeClear(pOperator->pDownstream);
pOperator->numOfDownstream = 0;
}
cleanupExprSupp(&pOperator->exprSupp);
taosMemoryFreeClear(pOperator);
}
// each operator should be set their own function to return total cost buffer
int32_t optrDefaultBufFn(SOperatorInfo* pOperator) {
if (pOperator->blocking) {
......@@ -106,40 +84,6 @@ int32_t optrDefaultBufFn(SOperatorInfo* pOperator) {
}
}
//int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder) {
// // todo add more information about exchange operation
// int32_t type = pOperator->operatorType;
// if (type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN ||
// type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN ||
// type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) {
// *order = TSDB_ORDER_ASC;
// *scanFlag = MAIN_SCAN;
// return TSDB_CODE_SUCCESS;
// } else if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
// if (!inheritUsOrder) {
// *order = TSDB_ORDER_ASC;
// }
// *scanFlag = MAIN_SCAN;
// return TSDB_CODE_SUCCESS;
// } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
// STableScanInfo* pTableScanInfo = pOperator->info;
// *order = pTableScanInfo->base.cond.order;
// *scanFlag = pTableScanInfo->base.scanFlag;
// return TSDB_CODE_SUCCESS;
// } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) {
// STableMergeScanInfo* pTableScanInfo = pOperator->info;
// *order = pTableScanInfo->base.cond.order;
// *scanFlag = pTableScanInfo->base.scanFlag;
// return TSDB_CODE_SUCCESS;
// } else {
// if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
// return TSDB_CODE_INVALID_PARA;
// } else {
// return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag, inheritUsOrder);
// }
// }
//}
static int64_t getQuerySupportBufSize(size_t numOfTables) {
size_t s1 = sizeof(STableQueryInfo);
// size_t s3 = sizeof(STableCheckInfo); buffer consumption in tsdb
......@@ -319,7 +263,7 @@ int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdStr) {
}
SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
SNode* pTagIndexCond, const char* pUser, const char* dbname) {
SNode* pTagIndexCond, const char* pUser, const char* dbname) {
int32_t type = nodeType(pPhyNode);
const char* idstr = GET_TASKID(pTaskInfo);
......@@ -347,7 +291,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo);
if (code) {
pTaskInfo->code = terrno;
pTaskInfo->code = code;
tableListDestroy(pTableListInfo);
return NULL;
}
......@@ -355,6 +299,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo);
if (NULL == pOperator) {
pTaskInfo->code = terrno;
tableListDestroy(pTableListInfo);
return NULL;
}
......@@ -578,3 +523,56 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
return pOptr;
}
void destroyOperator(SOperatorInfo* pOperator) {
if (pOperator == NULL) {
return;
}
if (pOperator->fpSet.closeFn != NULL) {
pOperator->fpSet.closeFn(pOperator->info);
}
if (pOperator->pDownstream != NULL) {
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
destroyOperator(pOperator->pDownstream[i]);
}
taosMemoryFreeClear(pOperator->pDownstream);
pOperator->numOfDownstream = 0;
}
cleanupExprSupp(&pOperator->exprSupp);
taosMemoryFreeClear(pOperator);
}
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
SExplainExecInfo execInfo = {0};
SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);
pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows;
pExplainInfo->startupCost = operatorInfo->cost.openCost;
pExplainInfo->totalCost = operatorInfo->cost.totalCost;
pExplainInfo->verboseLen = 0;
pExplainInfo->verboseInfo = NULL;
if (operatorInfo->fpSet.getExplainFn) {
int32_t code =
operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen);
if (code) {
qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code));
return code;
}
}
int32_t code = 0;
for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList);
if (code != TSDB_CODE_SUCCESS) {
// taosMemoryFreeClear(*pRes);
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
}
......@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorimpl.h"
#include "executorInt.h"
#include "filter.h"
#include "functionMgt.h"
#include "operator.h"
......
......@@ -24,14 +24,14 @@
#include "tdatablock.h"
#include "tmsg.h"
#include "executorimpl.h"
#include "executorInt.h"
#include "index.h"
#include "operator.h"
#include "query.h"
#include "querytask.h"
#include "thash.h"
#include "ttypes.h"
#include "vnode.h"
#include "operator.h"
#include "querytask.h"
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
......
......@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorimpl.h"
#include "executorInt.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
......
......@@ -13,11 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorInt.h"
#include "filter.h"
#include "executorimpl.h"
#include "tdatablock.h"
#include "operator.h"
#include "querytask.h"
#include "tdatablock.h"
typedef struct SSortOperatorInfo {
SOptrBasicInfo binfo;
......
......@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorimpl.h"
#include "executorInt.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
......
......@@ -20,7 +20,7 @@
#include "tmsg.h"
#include "ttypes.h"
#include "executorimpl.h"
#include "executorInt.h"
#include "tcommon.h"
#include "thash.h"
#include "ttime.h"
......
......@@ -12,17 +12,17 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorimpl.h"
#include "executorInt.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
#include "operator.h"
#include "querytask.h"
#include "tcommon.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tfill.h"
#include "ttime.h"
#include "operator.h"
#include "querytask.h"
typedef struct STimeSliceOperatorInfo {
SSDataBlock* pRes;
......
......@@ -12,21 +12,27 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorimpl.h"
#include "executorInt.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
#include "operator.h"
#include "querytask.h"
#include "tcommon.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tfill.h"
#include "ttime.h"
#include "operator.h"
#include "querytask.h"
#define IS_FINAL_OP(op) ((op)->isFinal)
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);
typedef struct SStateWindowInfo {
SResultWindowInfo winInfo;
SStateKeys* pStateKey;
} SStateWindowInfo;
typedef struct SSessionAggOperatorInfo {
SOptrBasicInfo binfo;
SAggSupporter aggSup;
......
......@@ -24,13 +24,13 @@
#include "os.h"
#include "executor.h"
#include "executorimpl.h"
#include "executorInt.h"
#include "function.h"
#include "operator.h"
#include "taos.h"
#include "tdatablock.h"
#include "tdef.h"
#include "tvariant.h"
#include "operator.h"
namespace {
......
......@@ -15,7 +15,7 @@
#include <gtest/gtest.h>
#include <iostream>
#include "executorimpl.h"
#include "executorInt.h"
#include "tlinearhash.h"
#pragma GCC diagnostic push
......
......@@ -26,7 +26,7 @@
#include "os.h"
#include "executor.h"
#include "executorimpl.h"
#include "executorInt.h"
#include "taos.h"
#include "tcompare.h"
#include "tdatablock.h"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册