diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index ea2939055edcfe427389b55428929b5b6d8222a8..51dfeed7aefb46cda30471acc9796b66fbe8a7ab 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -20,9 +20,9 @@ extern "C" { #endif #include "os.h" -#include "tsort.h" #include "tcommon.h" #include "tlosertree.h" +#include "tsort.h" #include "ttszip.h" #include "tvariant.h" @@ -35,13 +35,13 @@ extern "C" { #include "tarray.h" #include "thash.h" #include "tlockfree.h" -#include "tpagedbuf.h" #include "tmsg.h" +#include "tpagedbuf.h" typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); -#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED) -#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u) +#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED) +#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u) #define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP) #define GET_TABLEGROUP(q, _index) ((SArray*)taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index))) @@ -65,7 +65,7 @@ enum { }; typedef struct SResultRowCell { - uint64_t groupId; + uint64_t groupId; SResultRowPosition pos; } SResultRowCell; @@ -73,19 +73,19 @@ typedef struct SResultRowCell { * If the number of generated results is greater than this value, * query query will be halt and return results to client immediate. */ -typedef struct SResultInfo { // TODO refactor - int64_t totalRows; // total generated result size in rows - int64_t totalBytes; // total results in bytes. - int32_t capacity; // capacity of current result output buffer - int32_t threshold; // result size threshold in rows. +typedef struct SResultInfo { // TODO refactor + int64_t totalRows; // total generated result size in rows + int64_t totalBytes; // total results in bytes. + int32_t capacity; // capacity of current result output buffer + int32_t threshold; // result size threshold in rows. } SResultInfo; typedef struct STableQueryInfo { - TSKEY lastKey; // last check ts - uint64_t uid; // table uid - int32_t groupIndex; // group id in table list -// SVariant tag; - SResultRowInfo resInfo; // result info + TSKEY lastKey; // last check ts + uint64_t uid; // table uid + int32_t groupIndex; // group id in table list + // SVariant tag; + SResultRowInfo resInfo; // result info } STableQueryInfo; typedef enum { @@ -153,27 +153,27 @@ typedef struct SOperatorCostInfo { // The basic query information extracted from the SQueryInfo tree to support the // execution of query in a data node. typedef struct STaskAttr { - SLimit limit; - SLimit slimit; - bool stableQuery; // super table query or not - bool topBotQuery; // TODO used bitwise flag - bool groupbyColumn; // denote if this is a groupby normal column query - bool timeWindowInterpo; // if the time window start/end required interpolation - bool tsCompQuery; // is tscomp query - bool diffQuery; // is diff query - bool pointInterpQuery; // point interpolation query - int32_t havingNum; // having expr number - int16_t numOfCols; - int16_t numOfTags; - STimeWindow window; - SInterval interval; - int16_t precision; - int16_t numOfOutput; - int16_t fillType; - int32_t resultRowSize; - int32_t tagLen; // tag value length of current query - - SExprInfo *pExpr1; + SLimit limit; + SLimit slimit; + bool stableQuery; // super table query or not + bool topBotQuery; // TODO used bitwise flag + bool groupbyColumn; // denote if this is a groupby normal column query + bool timeWindowInterpo; // if the time window start/end required interpolation + bool tsCompQuery; // is tscomp query + bool diffQuery; // is diff query + bool pointInterpQuery; // point interpolation query + int32_t havingNum; // having expr number + int16_t numOfCols; + int16_t numOfTags; + STimeWindow window; + SInterval interval; + int16_t precision; + int16_t numOfOutput; + int16_t fillType; + int32_t resultRowSize; + int32_t tagLen; // tag value length of current query + + SExprInfo* pExpr1; SColumnInfo* tagColList; int32_t numOfFilterCols; int64_t* fillVal; @@ -186,13 +186,15 @@ struct SOperatorInfo; struct SAggSupporter; struct SOptrBasicInfo; -typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter *pSup, struct SOptrBasicInfo *pInfo, char **result, int32_t *length); -typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter *pSup, struct SOptrBasicInfo *pInfo, char *result, int32_t length); +typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter* pSup, + struct SOptrBasicInfo* pInfo, char** result, int32_t* length); +typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter* pSup, + struct SOptrBasicInfo* pInfo, char* result, int32_t length); typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr); typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr, bool* newgroup); typedef void (*__optr_close_fn_t)(void* param, int32_t num); -typedef int32_t (*__optr_get_explain_fn_t)(struct SOperatorInfo* pOptr, void **pOptrExplain); +typedef int32_t (*__optr_get_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain); typedef struct STaskIdInfo { uint64_t queryId; // this is also a request id @@ -202,21 +204,22 @@ typedef struct STaskIdInfo { } STaskIdInfo; typedef struct SExecTaskInfo { - STaskIdInfo id; - uint32_t status; - STimeWindow window; - STaskCostInfo cost; - int64_t owner; // if it is in execution - int32_t code; - uint64_t totalRows; // total number of rows - STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure - char* sql; // query sql string - jmp_buf env; // jump to this position when error happens. - EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] + STaskIdInfo id; + uint32_t status; + STimeWindow window; + STaskCostInfo cost; + int64_t owner; // if it is in execution + int32_t code; + uint64_t totalRows; // total number of rows + STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure + char* sql; // query sql string + jmp_buf env; // jump to this position when error happens. + EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] struct SOperatorInfo* pRoot; } SExecTaskInfo; typedef struct STaskRuntimeEnv { + jmp_buf env; STaskAttr* pQueryAttr; uint32_t status; // query status @@ -230,17 +233,17 @@ typedef struct STaskRuntimeEnv { SArray* pResultRowArrayList; // The array list that contains the Result rows char* keyBuf; // window key buffer // The window result objects pool, all the resultRow Objects are allocated and managed by this object. - char** prevRow; - SArray* prevResult; // intermediate result, SArray - STSBuf* pTsBuf; // timestamp filter list - STSCursor cur; + char** prevRow; + SArray* prevResult; // intermediate result, SArray + STSBuf* pTsBuf; // timestamp filter list + STSCursor cur; - char* tagVal; // tag value of current data block + char* tagVal; // tag value of current data block struct SScalarFunctionSupport* scalarSup; STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure struct SOperatorInfo* proot; - SGroupResInfo groupResInfo; - int64_t currentOffset; // dynamic offset value + SGroupResInfo groupResInfo; + int64_t currentOffset; // dynamic offset value STableQueryInfo* current; SResultInfo resultInfo; @@ -248,10 +251,10 @@ typedef struct STaskRuntimeEnv { } STaskRuntimeEnv; enum { - OP_NOT_OPENED = 0x0, - OP_OPENED = 0x1, + OP_NOT_OPENED = 0x0, + OP_OPENED = 0x1, OP_RES_TO_RETURN = 0x5, - OP_EXEC_DONE = 0x9, + OP_EXEC_DONE = 0x9, }; typedef struct SOperatorInfo { @@ -262,7 +265,7 @@ typedef struct SOperatorInfo { char* name; // name, used to show the query execution plan void* info; // extension attribution SExprInfo* pExpr; - STaskRuntimeEnv* pRuntimeEnv; // todo remove it + STaskRuntimeEnv* pRuntimeEnv; // todo remove it SExecTaskInfo* pTaskInfo; SOperatorCostInfo cost; SResultInfo resultInfo; @@ -270,8 +273,8 @@ typedef struct SOperatorInfo { int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator __optr_open_fn_t _openFn; // DO NOT invoke this function directly __optr_fn_t getNextFn; - __optr_fn_t getStreamResFn; // execute the aggregate in the stream model. - __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP + __optr_fn_t getStreamResFn; // execute the aggregate in the stream model. + __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP __optr_close_fn_t closeFn; __optr_encode_fn_t encodeResultRow; __optr_decode_fn_t decodeResultRow; @@ -286,33 +289,33 @@ typedef struct { typedef enum { EX_SOURCE_DATA_NOT_READY = 0x1, - EX_SOURCE_DATA_READY = 0x2, + EX_SOURCE_DATA_READY = 0x2, EX_SOURCE_DATA_EXHAUSTED = 0x3, } EX_SOURCE_STATUS; typedef struct SSourceDataInfo { - struct SExchangeInfo *pEx; + struct SExchangeInfo* pEx; int32_t index; - SRetrieveTableRsp *pRsp; + SRetrieveTableRsp* pRsp; uint64_t totalRows; int32_t code; EX_SOURCE_STATUS status; } SSourceDataInfo; typedef struct SLoadRemoteDataInfo { - uint64_t totalSize; // total load bytes from remote - uint64_t totalRows; // total number of rows - uint64_t totalElapsed; // total elapsed time + uint64_t totalSize; // total load bytes from remote + uint64_t totalRows; // total number of rows + uint64_t totalElapsed; // total elapsed time } SLoadRemoteDataInfo; typedef struct SExchangeInfo { - SArray* pSources; - SArray* pSourceDataInfo; - tsem_t ready; - void* pTransporter; - SSDataBlock* pResult; - bool seqLoadData; // sequential load data or not, false by default - int32_t current; + SArray* pSources; + SArray* pSourceDataInfo; + tsem_t ready; + void* pTransporter; + SSDataBlock* pResult; + bool seqLoadData; // sequential load data or not, false by default + int32_t current; SLoadRemoteDataInfo loadInfo; } SExchangeInfo; @@ -333,7 +336,7 @@ typedef struct STableScanInfo { int32_t current; int32_t reverseTimes; // 0 by default SNode* pFilterNode; // filter operator info - SqlFunctionCtx* pCtx; // next operator query context + SqlFunctionCtx* pCtx; // next operator query context SResultRowInfo* pResultRowInfo; int32_t* rowCellInfoOffset; SExprInfo* pExpr; @@ -342,6 +345,7 @@ typedef struct STableScanInfo { int32_t numOfOutput; int64_t elapsedTime; int32_t prevGroupId; // previous table group id + int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t dataBlockLoadFlag; } STableScanInfo; @@ -354,15 +358,15 @@ typedef struct STagScanInfo { } STagScanInfo; typedef struct SStreamBlockScanInfo { - SArray* pBlockLists; // multiple SSDatablock. - SSDataBlock* pRes; // result SSDataBlock - int32_t blockType; // current block type - int32_t validBlockIndex; // Is current data has returned? - SColumnInfo* pCols; // the output column info - uint64_t numOfRows; // total scanned rows - uint64_t numOfExec; // execution times - void* readerHandle; // stream block reader handle - SArray* pColMatchInfo; // + SArray* pBlockLists; // multiple SSDatablock. + SSDataBlock* pRes; // result SSDataBlock + int32_t blockType; // current block type + int32_t validBlockIndex; // Is current data has returned? + SColumnInfo* pCols; // the output column info + uint64_t numOfRows; // total scanned rows + uint64_t numOfExec; // execution times + void* readerHandle; // stream block reader handle + SArray* pColMatchInfo; // } SStreamBlockScanInfo; typedef struct SSysTableScanInfo { @@ -371,83 +375,83 @@ typedef struct SSysTableScanInfo { void* readHandle; }; - SRetrieveMetaTableRsp *pRsp; - SRetrieveTableReq req; - SEpSet epSet; - tsem_t ready; + SRetrieveMetaTableRsp* pRsp; + SRetrieveTableReq req; + SEpSet epSet; + tsem_t ready; - int32_t accountId; - bool showRewrite; - SNode *pCondition; // db_name filter condition, to discard data that are not in current database - void *pCur; // cursor for iterate the local table meta store. - SArray *scanCols; // SArray scan column id list + int32_t accountId; + bool showRewrite; + SNode* pCondition; // db_name filter condition, to discard data that are not in current database + void* pCur; // cursor for iterate the local table meta store. + SArray* scanCols; // SArray scan column id list - int32_t type; // show type, TODO remove it + int32_t type; // show type, TODO remove it SName name; - SSDataBlock *pRes; + SSDataBlock* pRes; int32_t capacity; int64_t numOfBlocks; // extract basic running information. SLoadRemoteDataInfo loadInfo; } SSysTableScanInfo; typedef struct SOptrBasicInfo { - SResultRowInfo resultRowInfo; - int32_t* rowCellInfoOffset; // offset value for each row result cell info - SqlFunctionCtx* pCtx; - SSDataBlock* pRes; - int32_t capacity; // TODO remove it + SResultRowInfo resultRowInfo; + int32_t* rowCellInfoOffset; // offset value for each row result cell info + SqlFunctionCtx* pCtx; + SSDataBlock* pRes; + int32_t capacity; // TODO remove it } SOptrBasicInfo; -//TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset +// TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset typedef struct SAggSupporter { - SHashObj* pResultRowHashTable; // quick locate the window object for each result - SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not - SArray* pResultRowArrayList; // The array list that contains the Result rows - char* keyBuf; // window key buffer - SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file - int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row + SHashObj* pResultRowHashTable; // quick locate the window object for each result + SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not + SArray* pResultRowArrayList; // The array list that contains the Result rows + char* keyBuf; // window key buffer + SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file + int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row } SAggSupporter; typedef struct STableIntervalOperatorInfo { - SOptrBasicInfo binfo; // basic info - SGroupResInfo groupResInfo; // multiple results build supporter - SInterval interval; // interval info - int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. - STimeWindow win; // query time range - bool timeWindowInterpo; // interpolation needed or not - char **pRow; // previous row/tuple of already processed datablock - SAggSupporter aggSup; // aggregate supporter - STableQueryInfo *pCurrent; // current tableQueryInfo struct - int32_t order; // current SSDataBlock scan order - EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] - SArray *pUpdatedWindow; // updated time window due to the input data block from the downstream operator. - SColumnInfoData timeWindowData; // query time window info for scalar function execution. + SOptrBasicInfo binfo; // basic info + SGroupResInfo groupResInfo; // multiple results build supporter + SInterval interval; // interval info + int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. + STimeWindow win; // query time range + bool timeWindowInterpo; // interpolation needed or not + char** pRow; // previous row/tuple of already processed datablock + SAggSupporter aggSup; // aggregate supporter + STableQueryInfo* pCurrent; // current tableQueryInfo struct + int32_t order; // current SSDataBlock scan order + EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] + SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator. + SColumnInfoData timeWindowData; // query time window info for scalar function execution. } STableIntervalOperatorInfo; typedef struct SAggOperatorInfo { - SOptrBasicInfo binfo; - SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file - SAggSupporter aggSup; - STableQueryInfo *current; - uint32_t groupId; - SGroupResInfo groupResInfo; - STableQueryInfo *pTableQueryInfo; + SOptrBasicInfo binfo; + SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file + SAggSupporter aggSup; + STableQueryInfo* current; + uint32_t groupId; + SGroupResInfo groupResInfo; + STableQueryInfo* pTableQueryInfo; } SAggOperatorInfo; typedef struct SProjectOperatorInfo { SOptrBasicInfo binfo; SAggSupporter aggSup; - SSDataBlock *existDataBlock; - SArray *pPseudoColInfo; + SSDataBlock* existDataBlock; + SArray* pPseudoColInfo; SLimit limit; SLimit slimit; - uint64_t groupId; - int64_t curSOffset; - int64_t curGroupOutput; + uint64_t groupId; + int64_t curSOffset; + int64_t curGroupOutput; - int64_t curOffset; - int64_t curOutput; + int64_t curOffset; + int64_t curOutput; } SProjectOperatorInfo; typedef struct SFillOperatorInfo { @@ -462,86 +466,87 @@ typedef struct SFillOperatorInfo { } SFillOperatorInfo; typedef struct { - char *pData; - bool isNull; - int16_t type; - int32_t bytes; + char* pData; + bool isNull; + int16_t type; + int32_t bytes; } SGroupKeys, SStateKeys; typedef struct SGroupbyOperatorInfo { - SOptrBasicInfo binfo; - SArray* pGroupCols; - SArray* pGroupColVals; // current group column values, SArray - SNode* pCondition; - bool isInit; // denote if current val is initialized or not - char* keyBuf; // group by keys for hash - int32_t groupKeyLen; // total group by column width - SGroupResInfo groupResInfo; - SAggSupporter aggSup; - SExprInfo* pScalarExprInfo; - int32_t numOfScalarExpr;// the number of scalar expression in group operator - SqlFunctionCtx*pScalarFuncCtx; + SOptrBasicInfo binfo; + SArray* pGroupCols; + SArray* pGroupColVals; // current group column values, SArray + SNode* pCondition; + bool isInit; // denote if current val is initialized or not + char* keyBuf; // group by keys for hash + int32_t groupKeyLen; // total group by column width + SGroupResInfo groupResInfo; + SAggSupporter aggSup; + SExprInfo* pScalarExprInfo; + int32_t numOfScalarExpr; // the number of scalar expression in group operator + SqlFunctionCtx* pScalarFuncCtx; } SGroupbyOperatorInfo; typedef struct SDataGroupInfo { uint64_t groupId; int64_t numOfRows; - SArray *pPageList; + SArray* pPageList; } SDataGroupInfo; // The sort in partition may be needed later. typedef struct SPartitionOperatorInfo { - SOptrBasicInfo binfo; - SArray* pGroupCols; - SArray* pGroupColVals; // current group column values, SArray - char* keyBuf; // group by keys for hash - int32_t groupKeyLen; // total group by column width - SHashObj* pGroupSet; // quick locate the window object for each result - - SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file - int32_t rowCapacity; // maximum number of rows for each buffer page - int32_t* columnOffset; // start position for each column data - - void* pGroupIter; // group iterator - int32_t pageIndex; // page index of current group + SOptrBasicInfo binfo; + SArray* pGroupCols; + SArray* pGroupColVals; // current group column values, SArray + char* keyBuf; // group by keys for hash + int32_t groupKeyLen; // total group by column width + SHashObj* pGroupSet; // quick locate the window object for each result + + SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file + int32_t rowCapacity; // maximum number of rows for each buffer page + int32_t* columnOffset; // start position for each column data + + void* pGroupIter; // group iterator + int32_t pageIndex; // page index of current group } SPartitionOperatorInfo; typedef struct SWindowRowsSup { - STimeWindow win; - TSKEY prevTs; - int32_t startRowIndex; - int32_t numOfRows; + STimeWindow win; + TSKEY prevTs; + int32_t startRowIndex; + int32_t numOfRows; } SWindowRowsSup; typedef struct SSessionAggOperatorInfo { - SOptrBasicInfo binfo; - SAggSupporter aggSup; - SGroupResInfo groupResInfo; - SWindowRowsSup winSup; - bool reptScan; // next round scan - int64_t gap; // session window gap - SColumnInfoData timeWindowData; // query time window info for scalar function execution. + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SGroupResInfo groupResInfo; + SWindowRowsSup winSup; + bool reptScan; // next round scan + int64_t gap; // session window gap + SColumnInfoData timeWindowData; // query time window info for scalar function execution. } SSessionAggOperatorInfo; typedef struct STimeSliceOperatorInfo { - SOptrBasicInfo binfo; - SInterval interval; - SGroupResInfo groupResInfo; // multiple results build supporter + SOptrBasicInfo binfo; + SInterval interval; + SGroupResInfo groupResInfo; // multiple results build supporter } STimeSliceOperatorInfo; typedef struct SStateWindowOperatorInfo { - SOptrBasicInfo binfo; - SAggSupporter aggSup; - SGroupResInfo groupResInfo; - SWindowRowsSup winSup; - int32_t colIndex; // start row index - bool hasKey; - SStateKeys stateKey; - SColumnInfoData timeWindowData; // query time window info for scalar function execution. -// bool reptScan; + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SGroupResInfo groupResInfo; + SWindowRowsSup winSup; + int32_t colIndex; // start row index + bool hasKey; + SStateKeys stateKey; + SColumnInfoData timeWindowData; // query time window info for scalar function execution. + // bool reptScan; } SStateWindowOperatorInfo; typedef struct SSortedMergeOperatorInfo { + SOptrBasicInfo binfo; SArray* pSortInfo; int32_t numOfSources; @@ -558,22 +563,26 @@ typedef struct SSortedMergeOperatorInfo { } SSortedMergeOperatorInfo; typedef struct SSortOperatorInfo { - uint32_t sortBufSize; // max buffer size for in-memory sort - SSDataBlock *pDataBlock; - SArray* pSortInfo; - SSortHandle *pSortHandle; - SArray* inputSlotMap; // for index map from table scan output - int32_t bufPageSize; - int32_t numOfRowsInRes; + uint32_t sortBufSize; // max buffer size for in-memory sort + SSDataBlock* pDataBlock; + SArray* pSortInfo; + SSortHandle* pSortHandle; + SArray* inputSlotMap; // for index map from table scan output + int32_t bufPageSize; + int32_t numOfRowsInRes; // TODO extact struct - int64_t startTs; // sort start time - uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. - uint64_t totalSize; // total load bytes from remote - uint64_t totalRows; // total number of rows - uint64_t totalElapsed; // total elapsed time + int64_t startTs; // sort start time + uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. + uint64_t totalSize; // total load bytes from remote + uint64_t totalRows; // total number of rows + uint64_t totalElapsed; // total elapsed time } SSortOperatorInfo; +typedef struct STagFilterOperatorInfo { + SOptrBasicInfo binfo; +} STagFilterOperatorInfo; + typedef struct SJoinOperatorInfo { SSDataBlock *pRes; int32_t joinType; @@ -593,23 +602,28 @@ typedef struct SJoinOperatorInfo { } SJoinOperatorInfo; int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); -void operatorDummyCloseFn(void* param, int32_t numOfCols); +void operatorDummyCloseFn(void* param, int32_t numOfCols); int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, int32_t numOfRows, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey); -void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, int32_t* rowCellOffset); -void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); -void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); -int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup); -void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput); -int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, - char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs, - uint64_t* total, SArray* pColList); -void doSetOperatorCompleted(SOperatorInfo* pOperator); -void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock); +void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, + SDiskbasedBuf* pBuf, int32_t* rowCellOffset); +void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, + SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); +void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, + int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); +int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes, + int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup); +void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput); +int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, + int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, + SArray* pColList); +void doSetOperatorCompleted(SOperatorInfo* pOperator); +void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock); SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset); SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); + SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t dataLoadFlag, int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, @@ -620,23 +634,34 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pIndexMap, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); + SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, - SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId); -SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot, + SNode* pCondition, SEpSet epset, SArray* colList, + SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId); +SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, - SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); +SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, + SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, + const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, + SArray* pTableIdList, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, - int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, + SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal, + bool multigroupResult, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, + SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, - SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); -SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo, + const STableGroupInfo* pTableGroupInfo); +SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo); @@ -649,7 +674,8 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun SOperatorInfo* createTagScanOperatorInfo(SReaderHandle* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput); #endif -void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList); +void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, + int32_t numOfOutput, SArray* pPseudoList); void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); @@ -661,23 +687,27 @@ STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow bool isTaskKilled(SExecTaskInfo* pTaskInfo); int32_t checkForQueryBuf(size_t numOfTables); -void setTaskKilled(SExecTaskInfo* pTaskInfo); +void setTaskKilled(SExecTaskInfo* pTaskInfo); void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType); void publishQueryAbortEvent(SExecTaskInfo* pTaskInfo, int32_t code); void queryCostStatis(SExecTaskInfo* pTaskInfo); -void doDestroyTask(SExecTaskInfo* pTaskInfo); +void doDestroyTask(SExecTaskInfo* pTaskInfo); int32_t getMaximumIdleDurationSec(); void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); -int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model); -int32_t getOperatorExplainExecInfo(SOperatorInfo *operatorInfo, SExplainExecInfo **pRes, int32_t *capacity, int32_t *resNum); - -bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char* result, int32_t length); -void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char **result, int32_t *length); +int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, + EOPTR_EXEC_MODEL model); +int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity, + int32_t* resNum); + +bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char* result, + int32_t length); +void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char** result, + int32_t* length); #ifdef __cplusplus } diff --git a/source/libs/executor/inc/indexoperator.h b/source/libs/executor/inc/indexoperator.h new file mode 100644 index 0000000000000000000000000000000000000000..9e67ac7f4139959ca4040e1b21cbeb1c20dfa1d8 --- /dev/null +++ b/source/libs/executor/inc/indexoperator.h @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "filter.h" +#include "tglobal.h" + +typedef enum { SFLT_NOT_INDEX, SFLT_COARSE_INDEX, SFLT_ACCURATE_INDEX } SIdxFltStatus; + +SIdxFltStatus idxGetFltStatus(SNode *pFilterNode); +// construct tag filter operator later +int32_t doFilterTag(const SNode *pFilterNode, SArray *resutl); diff --git a/source/libs/executor/src/indexoperator.c b/source/libs/executor/src/indexoperator.c new file mode 100644 index 0000000000000000000000000000000000000000..b5b9cdb7403d6e769304bd78031c928a11a94cbf --- /dev/null +++ b/source/libs/executor/src/indexoperator.c @@ -0,0 +1,198 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "indexoperator.h" +#include "executorimpl.h" +#include "nodes.h" + +typedef struct SIFCtx { + int32_t code; + SHashObj *pRes; /* element is SScalarParam */ +} SIFCtx; + +typedef struct SIFParam { + SArray * result; + SHashObj *pFilter; +} SIFParam; +// construct tag filter operator later +static void destroyTagFilterOperatorInfo(void *param) { + STagFilterOperatorInfo *pInfo = (STagFilterOperatorInfo *)param; +} + +static void sifFreeParam(SIFParam *param) { + if (param == NULL) return; + taosArrayDestroy(param->result); +} + +int32_t sifInitOperParams(SIFParam *params, SOperatorNode *node, SIFCtx *ctx) { + int32_t code = 0; + return code; +} +static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *output) { + qError("index-filter not support buildin function"); + return TSDB_CODE_SUCCESS; +} +static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) { + SIFParam *params = NULL; + + return TSDB_CODE_SUCCESS; +} + +static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *output) { return TSDB_CODE_SUCCESS; } + +static EDealRes sifWalkFunction(SNode *pNode, void *context) { + // impl later + SFunctionNode *node = (SFunctionNode *)pNode; + SIFParam output = {0}; + + SIFCtx *ctx = context; + ctx->code = sifExecFunction(node, ctx, &output); + if (ctx->code != TSDB_CODE_SUCCESS) { + return DEAL_RES_ERROR; + } + + if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) { + ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + return DEAL_RES_ERROR; + } + return DEAL_RES_CONTINUE; +} +static EDealRes sifWalkLogic(SNode *pNode, void *context) { + SLogicConditionNode *node = (SLogicConditionNode *)pNode; + SIFParam output = {0}; + + SIFCtx *ctx = context; + ctx->code = sifExecLogic(node, ctx, &output); + if (ctx->code) { + return DEAL_RES_ERROR; + } + + if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) { + ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + return DEAL_RES_ERROR; + } + return DEAL_RES_CONTINUE; +} +static EDealRes sifWalkOper(SNode *pNode, void *context) { + SOperatorNode *node = (SOperatorNode *)pNode; + SIFParam output = {0}; + + SIFCtx *ctx = context; + ctx->code = sifExecOper(node, ctx, &output); + if (ctx->code) { + return DEAL_RES_ERROR; + } + + if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) { + ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY; + return DEAL_RES_ERROR; + } + + return DEAL_RES_CONTINUE; +} + +EDealRes sifCalcWalker(SNode *node, void *context) { + if (QUERY_NODE_VALUE == nodeType(node) || QUERY_NODE_NODE_LIST == nodeType(node) || + QUERY_NODE_COLUMN == nodeType(node)) { + return DEAL_RES_CONTINUE; + } + SIFCtx *ctx = (SIFCtx *)context; + if (QUERY_NODE_FUNCTION == nodeType(node)) { + return sifWalkFunction(node, ctx); + } + if (QUERY_NODE_LOGIC_CONDITION == nodeType(node)) { + return sifWalkLogic(node, ctx); + } + if (QUERY_NODE_OPERATOR == nodeType(node)) { + return sifWalkOper(node, ctx); + } + + qError("invalid node type for index filter calculating, type:%d", nodeType(node)); + ctx->code = TSDB_CODE_QRY_INVALID_INPUT; + return DEAL_RES_ERROR; +} + +void sifFreeRes(SHashObj *res) { + void *pIter = taosHashIterate(res, NULL); + while (pIter) { + SIFParam *p = pIter; + if (p) { + sifFreeParam(p); + } + pIter = taosHashIterate(res, pIter); + } + taosHashCleanup(res); +} +static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) { + if (pNode == NULL || pDst == NULL) { + return TSDB_CODE_QRY_INVALID_INPUT; + } + int32_t code = 0; + SIFCtx ctx = {.code = 0}; + ctx.pRes = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + if (NULL == ctx.pRes) { + qError("index-filter failed to taosHashInit"); + return TSDB_CODE_QRY_OUT_OF_MEMORY; + } + nodesWalkExprPostOrder(pNode, sifCalcWalker, &ctx); + if (ctx.code != TSDB_CODE_SUCCESS) { + return ctx.code; + } + if (pDst) { + SIFParam *res = (SIFParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES); + if (res == NULL) { + qError("no valid res in hash, node:(%p), type(%d)", (void *)&pNode, nodeType(pNode)); + return TSDB_CODE_QRY_APP_ERROR; + } + taosArrayAddAll(pDst->result, res->result); + + sifFreeParam(res); + taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES); + } + return TSDB_CODE_SUCCESS; +} + +int32_t doFilterTag(const SNode *pFilterNode, SArray *result) { + if (pFilterNode == NULL) { + return TSDB_CODE_SUCCESS; + } + + SFilterInfo *filter = NULL; + // todo move to the initialization function + int32_t code = filterInitFromNode((SNode *)pFilterNode, &filter, 0); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + SIFParam param = {0}; + code = sifCalculate((SNode *)pFilterNode, ¶m); + + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + taosArrayAddAll(result, param.result); + sifFreeParam(¶m); + + return code; +} + +SIdxFltStatus idxGetFltStatus(SNode *pFilterNode) { + if (pFilterNode == NULL) { + return SFLT_NOT_INDEX; + } + // impl later + return SFLT_ACCURATE_INDEX; +}