From 46ec8300d9dcb668792d8bb1505916cd53bce7c5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 14 Apr 2022 22:32:49 +0800 Subject: [PATCH] enh: add tag filter operator --- source/libs/executor/inc/executorimpl.h | 576 ++++++++++++----------- source/libs/executor/inc/indexoperator.h | 20 + source/libs/executor/src/indexoperator.c | 32 ++ 3 files changed, 357 insertions(+), 271 deletions(-) create mode 100644 source/libs/executor/inc/indexoperator.h create mode 100644 source/libs/executor/src/indexoperator.c diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index cbd2dc66f5..18989935b5 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -20,9 +20,9 @@ extern "C" { #endif #include "os.h" -#include "tsort.h" #include "tcommon.h" #include "tlosertree.h" +#include "tsort.h" #include "ttszip.h" #include "tvariant.h" @@ -35,15 +35,15 @@ extern "C" { #include "tarray.h" #include "thash.h" #include "tlockfree.h" -#include "tpagedbuf.h" #include "tmsg.h" +#include "tpagedbuf.h" struct SColumnFilterElem; typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); -#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED) -#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u) +#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED) +#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u) #define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP) #define GET_TABLEGROUP(q, _index) ((SArray*)taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index))) @@ -67,7 +67,7 @@ enum { }; typedef struct SResultRowCell { - uint64_t groupId; + uint64_t groupId; SResultRowPosition pos; } SResultRowCell; @@ -75,19 +75,19 @@ typedef struct SResultRowCell { * If the number of generated results is greater than this value, * query query will be halt and return results to client immediate. */ -typedef struct SResultInfo { // TODO refactor - int64_t totalRows; // total generated result size in rows - int64_t totalBytes; // total results in bytes. - int32_t capacity; // capacity of current result output buffer - int32_t threshold; // result size threshold in rows. +typedef struct SResultInfo { // TODO refactor + int64_t totalRows; // total generated result size in rows + int64_t totalBytes; // total results in bytes. + int32_t capacity; // capacity of current result output buffer + int32_t threshold; // result size threshold in rows. } SResultInfo; typedef struct STableQueryInfo { - TSKEY lastKey; // last check ts - uint64_t uid; // table uid - int32_t groupIndex; // group id in table list -// SVariant tag; - SResultRowInfo resInfo; // result info + TSKEY lastKey; // last check ts + uint64_t uid; // table uid + int32_t groupIndex; // group id in table list + // SVariant tag; + SResultRowInfo resInfo; // result info } STableQueryInfo; typedef enum { @@ -155,27 +155,27 @@ typedef struct SOperatorCostInfo { // The basic query information extracted from the SQueryInfo tree to support the // execution of query in a data node. typedef struct STaskAttr { - SLimit limit; - SLimit slimit; - bool stableQuery; // super table query or not - bool topBotQuery; // TODO used bitwise flag - bool groupbyColumn; // denote if this is a groupby normal column query - bool timeWindowInterpo; // if the time window start/end required interpolation - bool tsCompQuery; // is tscomp query - bool diffQuery; // is diff query - bool pointInterpQuery; // point interpolation query - int32_t havingNum; // having expr number - int16_t numOfCols; - int16_t numOfTags; - STimeWindow window; - SInterval interval; - int16_t precision; - int16_t numOfOutput; - int16_t fillType; - int32_t resultRowSize; - int32_t tagLen; // tag value length of current query - - SExprInfo *pExpr1; + SLimit limit; + SLimit slimit; + bool stableQuery; // super table query or not + bool topBotQuery; // TODO used bitwise flag + bool groupbyColumn; // denote if this is a groupby normal column query + bool timeWindowInterpo; // if the time window start/end required interpolation + bool tsCompQuery; // is tscomp query + bool diffQuery; // is diff query + bool pointInterpQuery; // point interpolation query + int32_t havingNum; // having expr number + int16_t numOfCols; + int16_t numOfTags; + STimeWindow window; + SInterval interval; + int16_t precision; + int16_t numOfOutput; + int16_t fillType; + int32_t resultRowSize; + int32_t tagLen; // tag value length of current query + + SExprInfo* pExpr1; SColumnInfo* tagColList; int32_t numOfFilterCols; int64_t* fillVal; @@ -188,13 +188,15 @@ struct SOperatorInfo; struct SAggSupporter; struct SOptrBasicInfo; -typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter *pSup, struct SOptrBasicInfo *pInfo, char **result, int32_t *length); -typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter *pSup, struct SOptrBasicInfo *pInfo, char *result, int32_t length); +typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter* pSup, + struct SOptrBasicInfo* pInfo, char** result, int32_t* length); +typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter* pSup, + struct SOptrBasicInfo* pInfo, char* result, int32_t length); typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr); typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr, bool* newgroup); typedef void (*__optr_close_fn_t)(void* param, int32_t num); -typedef int32_t (*__optr_get_explain_fn_t)(struct SOperatorInfo* pOptr, void **pOptrExplain); +typedef int32_t (*__optr_get_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain); typedef struct STaskIdInfo { uint64_t queryId; // this is also a request id @@ -204,49 +206,49 @@ typedef struct STaskIdInfo { } STaskIdInfo; typedef struct SExecTaskInfo { - STaskIdInfo id; - uint32_t status; - STimeWindow window; - STaskCostInfo cost; - int64_t owner; // if it is in execution - int32_t code; - uint64_t totalRows; // total number of rows - STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure - char* sql; // query sql string - jmp_buf env; // jump to this position when error happens. - EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] + STaskIdInfo id; + uint32_t status; + STimeWindow window; + STaskCostInfo cost; + int64_t owner; // if it is in execution + int32_t code; + uint64_t totalRows; // total number of rows + STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure + char* sql; // query sql string + jmp_buf env; // jump to this position when error happens. + EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] struct SOperatorInfo* pRoot; } SExecTaskInfo; typedef struct STaskRuntimeEnv { - jmp_buf env; - STaskAttr* pQueryAttr; - uint32_t status; // query status - void* qinfo; - uint8_t scanFlag; // denotes reversed scan of data or not - void* pTsdbReadHandle; - - int32_t prevGroupId; // previous executed group id - bool enableGroupData; - SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file - SHashObj* pResultRowHashTable; // quick locate the window object for each result - SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not - SArray* pResultRowArrayList; // The array list that contains the Result rows - char* keyBuf; // window key buffer + jmp_buf env; + STaskAttr* pQueryAttr; + uint32_t status; // query status + void* qinfo; + uint8_t scanFlag; // denotes reversed scan of data or not + void* pTsdbReadHandle; + + int32_t prevGroupId; // previous executed group id + bool enableGroupData; + SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file + SHashObj* pResultRowHashTable; // quick locate the window object for each result + SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not + SArray* pResultRowArrayList; // The array list that contains the Result rows + char* keyBuf; // window key buffer // The window result objects pool, all the resultRow Objects are allocated and managed by this object. - char** prevRow; - SArray* prevResult; // intermediate result, SArray - STSBuf* pTsBuf; // timestamp filter list - STSCursor cur; + char** prevRow; + SArray* prevResult; // intermediate result, SArray + STSBuf* pTsBuf; // timestamp filter list + STSCursor cur; - char* tagVal; // tag value of current data block + char* tagVal; // tag value of current data block struct SScalarFunctionSupport* scalarSup; SSDataBlock* outputBuf; STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure struct SOperatorInfo* proot; - SGroupResInfo groupResInfo; - int64_t currentOffset; // dynamic offset value + SGroupResInfo groupResInfo; + int64_t currentOffset; // dynamic offset value STableQueryInfo* current; SResultInfo resultInfo; @@ -255,10 +257,10 @@ typedef struct STaskRuntimeEnv { } STaskRuntimeEnv; enum { - OP_NOT_OPENED = 0x0, - OP_OPENED = 0x1, + OP_NOT_OPENED = 0x0, + OP_OPENED = 0x1, OP_RES_TO_RETURN = 0x5, - OP_EXEC_DONE = 0x9, + OP_EXEC_DONE = 0x9, }; typedef struct SOperatorInfo { @@ -269,7 +271,7 @@ typedef struct SOperatorInfo { char* name; // name, used to show the query execution plan void* info; // extension attribution SExprInfo* pExpr; - STaskRuntimeEnv* pRuntimeEnv; // todo remove it + STaskRuntimeEnv* pRuntimeEnv; // todo remove it SExecTaskInfo* pTaskInfo; SOperatorCostInfo cost; SResultInfo resultInfo; @@ -277,8 +279,8 @@ typedef struct SOperatorInfo { int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator __optr_open_fn_t _openFn; // DO NOT invoke this function directly __optr_fn_t getNextFn; - __optr_fn_t getStreamResFn; // execute the aggregate in the stream model. - __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP + __optr_fn_t getStreamResFn; // execute the aggregate in the stream model. + __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP __optr_close_fn_t closeFn; __optr_encode_fn_t encodeResultRow; __optr_decode_fn_t decodeResultRow; @@ -293,33 +295,33 @@ typedef struct { typedef enum { EX_SOURCE_DATA_NOT_READY = 0x1, - EX_SOURCE_DATA_READY = 0x2, + EX_SOURCE_DATA_READY = 0x2, EX_SOURCE_DATA_EXHAUSTED = 0x3, } EX_SOURCE_STATUS; typedef struct SSourceDataInfo { - struct SExchangeInfo *pEx; + struct SExchangeInfo* pEx; int32_t index; - SRetrieveTableRsp *pRsp; + SRetrieveTableRsp* pRsp; uint64_t totalRows; int32_t code; EX_SOURCE_STATUS status; } SSourceDataInfo; typedef struct SLoadRemoteDataInfo { - uint64_t totalSize; // total load bytes from remote - uint64_t totalRows; // total number of rows - uint64_t totalElapsed; // total elapsed time + uint64_t totalSize; // total load bytes from remote + uint64_t totalRows; // total number of rows + uint64_t totalElapsed; // total elapsed time } SLoadRemoteDataInfo; typedef struct SExchangeInfo { - SArray* pSources; - SArray* pSourceDataInfo; - tsem_t ready; - void* pTransporter; - SSDataBlock* pResult; - bool seqLoadData; // sequential load data or not, false by default - int32_t current; + SArray* pSources; + SArray* pSourceDataInfo; + tsem_t ready; + void* pTransporter; + SSDataBlock* pResult; + bool seqLoadData; // sequential load data or not, false by default + int32_t current; SLoadRemoteDataInfo loadInfo; } SExchangeInfo; @@ -340,7 +342,7 @@ typedef struct STableScanInfo { int32_t current; int32_t reverseTimes; // 0 by default SNode* pFilterNode; // filter operator info - SqlFunctionCtx* pCtx; // next operator query context + SqlFunctionCtx* pCtx; // next operator query context SResultRowInfo* pResultRowInfo; int32_t* rowCellInfoOffset; SExprInfo* pExpr; @@ -349,7 +351,7 @@ typedef struct STableScanInfo { int32_t numOfOutput; int64_t elapsedTime; int32_t prevGroupId; // previous table group id - int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan + int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan } STableScanInfo; typedef struct STagScanInfo { @@ -360,15 +362,15 @@ typedef struct STagScanInfo { } STagScanInfo; typedef struct SStreamBlockScanInfo { - SArray* pBlockLists; // multiple SSDatablock. - SSDataBlock* pRes; // result SSDataBlock - int32_t blockType; // current block type - int32_t validBlockIndex; // Is current data has returned? - SColumnInfo* pCols; // the output column info - uint64_t numOfRows; // total scanned rows - uint64_t numOfExec; // execution times - void* readerHandle; // stream block reader handle - SArray* pColMatchInfo; // + SArray* pBlockLists; // multiple SSDatablock. + SSDataBlock* pRes; // result SSDataBlock + int32_t blockType; // current block type + int32_t validBlockIndex; // Is current data has returned? + SColumnInfo* pCols; // the output column info + uint64_t numOfRows; // total scanned rows + uint64_t numOfExec; // execution times + void* readerHandle; // stream block reader handle + SArray* pColMatchInfo; // } SStreamBlockScanInfo; typedef struct SSysTableScanInfo { @@ -377,83 +379,83 @@ typedef struct SSysTableScanInfo { void* readHandle; }; - SRetrieveMetaTableRsp *pRsp; - SRetrieveTableReq req; - SEpSet epSet; - tsem_t ready; + SRetrieveMetaTableRsp* pRsp; + SRetrieveTableReq req; + SEpSet epSet; + tsem_t ready; - int32_t accountId; - bool showRewrite; - SNode *pCondition; // db_name filter condition, to discard data that are not in current database - void *pCur; // cursor for iterate the local table meta store. - SArray *scanCols; // SArray scan column id list + int32_t accountId; + bool showRewrite; + SNode* pCondition; // db_name filter condition, to discard data that are not in current database + void* pCur; // cursor for iterate the local table meta store. + SArray* scanCols; // SArray scan column id list - int32_t type; // show type, TODO remove it + int32_t type; // show type, TODO remove it SName name; - SSDataBlock *pRes; + SSDataBlock* pRes; int32_t capacity; int64_t numOfBlocks; // extract basic running information. SLoadRemoteDataInfo loadInfo; } SSysTableScanInfo; typedef struct SOptrBasicInfo { - SResultRowInfo resultRowInfo; - int32_t* rowCellInfoOffset; // offset value for each row result cell info - SqlFunctionCtx* pCtx; - SSDataBlock* pRes; - int32_t capacity; // TODO remove it + SResultRowInfo resultRowInfo; + int32_t* rowCellInfoOffset; // offset value for each row result cell info + SqlFunctionCtx* pCtx; + SSDataBlock* pRes; + int32_t capacity; // TODO remove it } SOptrBasicInfo; -//TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset +// TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset typedef struct SAggSupporter { - SHashObj* pResultRowHashTable; // quick locate the window object for each result - SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not - SArray* pResultRowArrayList; // The array list that contains the Result rows - char* keyBuf; // window key buffer - SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file - int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row + SHashObj* pResultRowHashTable; // quick locate the window object for each result + SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not + SArray* pResultRowArrayList; // The array list that contains the Result rows + char* keyBuf; // window key buffer + SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file + int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row } SAggSupporter; typedef struct STableIntervalOperatorInfo { - SOptrBasicInfo binfo; // basic info - SGroupResInfo groupResInfo; // multiple results build supporter - SInterval interval; // interval info - int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. - STimeWindow win; // query time range - bool timeWindowInterpo; // interpolation needed or not - char **pRow; // previous row/tuple of already processed datablock - SAggSupporter aggSup; // aggregate supporter - STableQueryInfo *pCurrent; // current tableQueryInfo struct - int32_t order; // current SSDataBlock scan order - EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] - SArray *pUpdatedWindow; // updated time window due to the input data block from the downstream operator. - SColumnInfoData timeWindowData; // query time window info for scalar function execution. + SOptrBasicInfo binfo; // basic info + SGroupResInfo groupResInfo; // multiple results build supporter + SInterval interval; // interval info + int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. + STimeWindow win; // query time range + bool timeWindowInterpo; // interpolation needed or not + char** pRow; // previous row/tuple of already processed datablock + SAggSupporter aggSup; // aggregate supporter + STableQueryInfo* pCurrent; // current tableQueryInfo struct + int32_t order; // current SSDataBlock scan order + EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] + SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator. + SColumnInfoData timeWindowData; // query time window info for scalar function execution. } STableIntervalOperatorInfo; typedef struct SAggOperatorInfo { - SOptrBasicInfo binfo; - SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file - SAggSupporter aggSup; - STableQueryInfo *current; - uint32_t groupId; - SGroupResInfo groupResInfo; - STableQueryInfo *pTableQueryInfo; + SOptrBasicInfo binfo; + SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file + SAggSupporter aggSup; + STableQueryInfo* current; + uint32_t groupId; + SGroupResInfo groupResInfo; + STableQueryInfo* pTableQueryInfo; } SAggOperatorInfo; typedef struct SProjectOperatorInfo { SOptrBasicInfo binfo; SAggSupporter aggSup; - SSDataBlock *existDataBlock; - SArray *pPseudoColInfo; + SSDataBlock* existDataBlock; + SArray* pPseudoColInfo; SLimit limit; SLimit slimit; - uint64_t groupId; - int64_t curSOffset; - int64_t curGroupOutput; + uint64_t groupId; + int64_t curSOffset; + int64_t curGroupOutput; - int64_t curOffset; - int64_t curOutput; + int64_t curOffset; + int64_t curOutput; } SProjectOperatorInfo; typedef struct SFillOperatorInfo { @@ -468,165 +470,192 @@ typedef struct SFillOperatorInfo { } SFillOperatorInfo; typedef struct { - char *pData; - bool isNull; - int16_t type; - int32_t bytes; + char* pData; + bool isNull; + int16_t type; + int32_t bytes; } SGroupKeys, SStateKeys; typedef struct SGroupbyOperatorInfo { - SOptrBasicInfo binfo; - SArray* pGroupCols; - SArray* pGroupColVals; // current group column values, SArray - SNode* pCondition; - bool isInit; // denote if current val is initialized or not - char* keyBuf; // group by keys for hash - int32_t groupKeyLen; // total group by column width - SGroupResInfo groupResInfo; - SAggSupporter aggSup; - SExprInfo* pScalarExprInfo; - int32_t numOfScalarExpr;// the number of scalar expression in group operator - SqlFunctionCtx*pScalarFuncCtx; + SOptrBasicInfo binfo; + SArray* pGroupCols; + SArray* pGroupColVals; // current group column values, SArray + SNode* pCondition; + bool isInit; // denote if current val is initialized or not + char* keyBuf; // group by keys for hash + int32_t groupKeyLen; // total group by column width + SGroupResInfo groupResInfo; + SAggSupporter aggSup; + SExprInfo* pScalarExprInfo; + int32_t numOfScalarExpr; // the number of scalar expression in group operator + SqlFunctionCtx* pScalarFuncCtx; } SGroupbyOperatorInfo; typedef struct SDataGroupInfo { uint64_t groupId; int64_t numOfRows; - SArray *pPageList; + SArray* pPageList; } SDataGroupInfo; // The sort in partition may be needed later. typedef struct SPartitionOperatorInfo { - SOptrBasicInfo binfo; - SArray* pGroupCols; - SArray* pGroupColVals; // current group column values, SArray - char* keyBuf; // group by keys for hash - int32_t groupKeyLen; // total group by column width - SHashObj* pGroupSet; // quick locate the window object for each result - - SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file - int32_t rowCapacity; // maximum number of rows for each buffer page - int32_t* columnOffset; // start position for each column data - - void* pGroupIter; // group iterator - int32_t pageIndex; // page index of current group + SOptrBasicInfo binfo; + SArray* pGroupCols; + SArray* pGroupColVals; // current group column values, SArray + char* keyBuf; // group by keys for hash + int32_t groupKeyLen; // total group by column width + SHashObj* pGroupSet; // quick locate the window object for each result + + SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file + int32_t rowCapacity; // maximum number of rows for each buffer page + int32_t* columnOffset; // start position for each column data + + void* pGroupIter; // group iterator + int32_t pageIndex; // page index of current group } SPartitionOperatorInfo; typedef struct SWindowRowsSup { - STimeWindow win; - TSKEY prevTs; - int32_t startRowIndex; - int32_t numOfRows; + STimeWindow win; + TSKEY prevTs; + int32_t startRowIndex; + int32_t numOfRows; } SWindowRowsSup; typedef struct SSessionAggOperatorInfo { - SOptrBasicInfo binfo; - SAggSupporter aggSup; - SGroupResInfo groupResInfo; - SWindowRowsSup winSup; - bool reptScan; // next round scan - int64_t gap; // session window gap - SColumnInfoData timeWindowData; // query time window info for scalar function execution. + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SGroupResInfo groupResInfo; + SWindowRowsSup winSup; + bool reptScan; // next round scan + int64_t gap; // session window gap + SColumnInfoData timeWindowData; // query time window info for scalar function execution. } SSessionAggOperatorInfo; typedef struct STimeSliceOperatorInfo { - SOptrBasicInfo binfo; - SInterval interval; - SGroupResInfo groupResInfo; // multiple results build supporter + SOptrBasicInfo binfo; + SInterval interval; + SGroupResInfo groupResInfo; // multiple results build supporter } STimeSliceOperatorInfo; typedef struct SStateWindowOperatorInfo { - SOptrBasicInfo binfo; - SAggSupporter aggSup; - SGroupResInfo groupResInfo; - SWindowRowsSup winSup; - int32_t colIndex; // start row index - bool hasKey; - SStateKeys stateKey; - SColumnInfoData timeWindowData; // query time window info for scalar function execution. -// bool reptScan; + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SGroupResInfo groupResInfo; + SWindowRowsSup winSup; + int32_t colIndex; // start row index + bool hasKey; + SStateKeys stateKey; + SColumnInfoData timeWindowData; // query time window info for scalar function execution. + // bool reptScan; } SStateWindowOperatorInfo; typedef struct SSortedMergeOperatorInfo { - SOptrBasicInfo binfo; - bool hasVarCol; - - SArray* pSortInfo; - int32_t numOfSources; - SSortHandle *pSortHandle; - int32_t bufPageSize; - uint32_t sortBufSize; // max buffer size for in-memory sort - int32_t resultRowFactor; - bool hasGroupVal; - SDiskbasedBuf *pTupleStore; // keep the final results - int32_t numOfResPerPage; - char** groupVal; - SArray *groupInfo; - SAggSupporter aggSup; + SOptrBasicInfo binfo; + bool hasVarCol; + + SArray* pSortInfo; + int32_t numOfSources; + SSortHandle* pSortHandle; + int32_t bufPageSize; + uint32_t sortBufSize; // max buffer size for in-memory sort + int32_t resultRowFactor; + bool hasGroupVal; + SDiskbasedBuf* pTupleStore; // keep the final results + int32_t numOfResPerPage; + char** groupVal; + SArray* groupInfo; + SAggSupporter aggSup; } SSortedMergeOperatorInfo; typedef struct SSortOperatorInfo { - uint32_t sortBufSize; // max buffer size for in-memory sort - SSDataBlock *pDataBlock; - SArray* pSortInfo; - SSortHandle *pSortHandle; - SArray* inputSlotMap; // for index map from table scan output - int32_t bufPageSize; - int32_t numOfRowsInRes; + uint32_t sortBufSize; // max buffer size for in-memory sort + SSDataBlock* pDataBlock; + SArray* pSortInfo; + SSortHandle* pSortHandle; + SArray* inputSlotMap; // for index map from table scan output + int32_t bufPageSize; + int32_t numOfRowsInRes; // TODO extact struct - int64_t startTs; // sort start time - uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. - uint64_t totalSize; // total load bytes from remote - uint64_t totalRows; // total number of rows - uint64_t totalElapsed; // total elapsed time + int64_t startTs; // sort start time + uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. + uint64_t totalSize; // total load bytes from remote + uint64_t totalRows; // total number of rows + uint64_t totalElapsed; // total elapsed time } SSortOperatorInfo; +typedef struct STagFilterOperatorInfo { + SOptrBasicInfo binfo; +} STagFilterOperatorInfo; + int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); -void operatorDummyCloseFn(void* param, int32_t numOfCols); +void operatorDummyCloseFn(void* param, int32_t numOfCols); int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, int32_t numOfRows, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey); -void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, int32_t* rowCellOffset); -void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); -void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); -int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup); -void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput); -int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, - char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs, - uint64_t* total, SArray* pColList); -void doSetOperatorCompleted(SOperatorInfo* pOperator); -void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock); +void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, + SDiskbasedBuf* pBuf, int32_t* rowCellOffset); +void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, + SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); +void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, + int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); +int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes, + int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup); +void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput); +int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, + int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, + SArray* pColList); +void doSetOperatorCompleted(SOperatorInfo* pOperator); +void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock); SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset); SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime, - int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, - SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); -SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); - -SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo); -SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pIndexMap, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); + int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, + SNode* pCondition, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, + const STableGroupInfo* pTableGroupInfo); +SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, + const STableGroupInfo* pTableGroupInfo); + +SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, + SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, + SExecTaskInfo* pTaskInfo); +SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, + SArray* pIndexMap, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, + int32_t num, SArray* pSortInfo, SArray* pGroupInfo, + SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, - SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId); -SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot, + SNode* pCondition, SEpSet epset, SArray* colList, + SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId); +SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, - SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); +SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, + SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, + const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, + SArray* pTableIdList, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, - int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, + SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal, + bool multigroupResult, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, + SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, - SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); -SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo, + const STableGroupInfo* pTableGroupInfo); +SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo); #if 0 SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); @@ -640,7 +669,8 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOf int32_t numOfOutput); #endif -void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList); +void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, + int32_t numOfOutput, SArray* pPseudoList); void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); @@ -652,23 +682,27 @@ STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow bool isTaskKilled(SExecTaskInfo* pTaskInfo); int32_t checkForQueryBuf(size_t numOfTables); -void setTaskKilled(SExecTaskInfo* pTaskInfo); +void setTaskKilled(SExecTaskInfo* pTaskInfo); void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType); void publishQueryAbortEvent(SExecTaskInfo* pTaskInfo, int32_t code); void queryCostStatis(SExecTaskInfo* pTaskInfo); -void doDestroyTask(SExecTaskInfo* pTaskInfo); +void doDestroyTask(SExecTaskInfo* pTaskInfo); int32_t getMaximumIdleDurationSec(); void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); -int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model); -int32_t getOperatorExplainExecInfo(SOperatorInfo *operatorInfo, SExplainExecInfo **pRes, int32_t *capacity, int32_t *resNum); - -bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char* result, int32_t length); -void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char **result, int32_t *length); +int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, + EOPTR_EXEC_MODEL model); +int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity, + int32_t* resNum); + +bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char* result, + int32_t length); +void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char** result, + int32_t* length); #ifdef __cplusplus } diff --git a/source/libs/executor/inc/indexoperator.h b/source/libs/executor/inc/indexoperator.h new file mode 100644 index 0000000000..3113945c13 --- /dev/null +++ b/source/libs/executor/inc/indexoperator.h @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "filter.h" +#include "tglobal.h" + +// construct tag filter operator later +int32_t doFilterTag(const SNode *pFilterNode, SArray *resutl); diff --git a/source/libs/executor/src/indexoperator.c b/source/libs/executor/src/indexoperator.c new file mode 100644 index 0000000000..b733ecdc6b --- /dev/null +++ b/source/libs/executor/src/indexoperator.c @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "indexoperator.h" +#include "executorimpl.h" +// construct tag filter operator later +static void destroyTagFilterOperatorInfo(void *param) { + STagFilterOperatorInfo *pInfo = (STagFilterOperatorInfo *)param; +} +int32_t doFilterTag(const SNode *pFilterNode, SArray *resutl) { + if (pFilterNode == NULL) { + return TSDB_CODE_SUCCESS; + } + + SFilterInfo *filter = NULL; + + // todo move to the initialization function + int32_t code = filterInitFromNode((SNode *)pFilterNode, &filter, 0); + return code; +} -- GitLab