提交 398d7d61 编写于 作者: P plum-lihui

Merge branch '3.0' of github.com:taosdata/TDengine into 3.0

...@@ -20,9 +20,9 @@ extern "C" { ...@@ -20,9 +20,9 @@ extern "C" {
#endif #endif
#include "os.h" #include "os.h"
#include "tsort.h"
#include "tcommon.h" #include "tcommon.h"
#include "tlosertree.h" #include "tlosertree.h"
#include "tsort.h"
#include "ttszip.h" #include "ttszip.h"
#include "tvariant.h" #include "tvariant.h"
...@@ -35,13 +35,13 @@ extern "C" { ...@@ -35,13 +35,13 @@ extern "C" {
#include "tarray.h" #include "tarray.h"
#include "thash.h" #include "thash.h"
#include "tlockfree.h" #include "tlockfree.h"
#include "tpagedbuf.h"
#include "tmsg.h" #include "tmsg.h"
#include "tpagedbuf.h"
typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order);
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED) #define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u) #define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u)
#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP) #define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)
#define GET_TABLEGROUP(q, _index) ((SArray*)taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index))) #define GET_TABLEGROUP(q, _index) ((SArray*)taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
...@@ -65,7 +65,7 @@ enum { ...@@ -65,7 +65,7 @@ enum {
}; };
typedef struct SResultRowCell { typedef struct SResultRowCell {
uint64_t groupId; uint64_t groupId;
SResultRowPosition pos; SResultRowPosition pos;
} SResultRowCell; } SResultRowCell;
...@@ -73,19 +73,19 @@ typedef struct SResultRowCell { ...@@ -73,19 +73,19 @@ typedef struct SResultRowCell {
* If the number of generated results is greater than this value, * If the number of generated results is greater than this value,
* query query will be halt and return results to client immediate. * query query will be halt and return results to client immediate.
*/ */
typedef struct SResultInfo { // TODO refactor typedef struct SResultInfo { // TODO refactor
int64_t totalRows; // total generated result size in rows int64_t totalRows; // total generated result size in rows
int64_t totalBytes; // total results in bytes. int64_t totalBytes; // total results in bytes.
int32_t capacity; // capacity of current result output buffer int32_t capacity; // capacity of current result output buffer
int32_t threshold; // result size threshold in rows. int32_t threshold; // result size threshold in rows.
} SResultInfo; } SResultInfo;
typedef struct STableQueryInfo { typedef struct STableQueryInfo {
TSKEY lastKey; // last check ts TSKEY lastKey; // last check ts
uint64_t uid; // table uid uint64_t uid; // table uid
int32_t groupIndex; // group id in table list int32_t groupIndex; // group id in table list
// SVariant tag; // SVariant tag;
SResultRowInfo resInfo; // result info SResultRowInfo resInfo; // result info
} STableQueryInfo; } STableQueryInfo;
typedef enum { typedef enum {
...@@ -153,27 +153,27 @@ typedef struct SOperatorCostInfo { ...@@ -153,27 +153,27 @@ typedef struct SOperatorCostInfo {
// The basic query information extracted from the SQueryInfo tree to support the // The basic query information extracted from the SQueryInfo tree to support the
// execution of query in a data node. // execution of query in a data node.
typedef struct STaskAttr { typedef struct STaskAttr {
SLimit limit; SLimit limit;
SLimit slimit; SLimit slimit;
bool stableQuery; // super table query or not bool stableQuery; // super table query or not
bool topBotQuery; // TODO used bitwise flag bool topBotQuery; // TODO used bitwise flag
bool groupbyColumn; // denote if this is a groupby normal column query bool groupbyColumn; // denote if this is a groupby normal column query
bool timeWindowInterpo; // if the time window start/end required interpolation bool timeWindowInterpo; // if the time window start/end required interpolation
bool tsCompQuery; // is tscomp query bool tsCompQuery; // is tscomp query
bool diffQuery; // is diff query bool diffQuery; // is diff query
bool pointInterpQuery; // point interpolation query bool pointInterpQuery; // point interpolation query
int32_t havingNum; // having expr number int32_t havingNum; // having expr number
int16_t numOfCols; int16_t numOfCols;
int16_t numOfTags; int16_t numOfTags;
STimeWindow window; STimeWindow window;
SInterval interval; SInterval interval;
int16_t precision; int16_t precision;
int16_t numOfOutput; int16_t numOfOutput;
int16_t fillType; int16_t fillType;
int32_t resultRowSize; int32_t resultRowSize;
int32_t tagLen; // tag value length of current query int32_t tagLen; // tag value length of current query
SExprInfo *pExpr1; SExprInfo* pExpr1;
SColumnInfo* tagColList; SColumnInfo* tagColList;
int32_t numOfFilterCols; int32_t numOfFilterCols;
int64_t* fillVal; int64_t* fillVal;
...@@ -186,13 +186,15 @@ struct SOperatorInfo; ...@@ -186,13 +186,15 @@ struct SOperatorInfo;
struct SAggSupporter; struct SAggSupporter;
struct SOptrBasicInfo; struct SOptrBasicInfo;
typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter *pSup, struct SOptrBasicInfo *pInfo, char **result, int32_t *length); typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter* pSup,
typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter *pSup, struct SOptrBasicInfo *pInfo, char *result, int32_t length); struct SOptrBasicInfo* pInfo, char** result, int32_t* length);
typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter* pSup,
struct SOptrBasicInfo* pInfo, char* result, int32_t length);
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr); typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr, bool* newgroup); typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr, bool* newgroup);
typedef void (*__optr_close_fn_t)(void* param, int32_t num); typedef void (*__optr_close_fn_t)(void* param, int32_t num);
typedef int32_t (*__optr_get_explain_fn_t)(struct SOperatorInfo* pOptr, void **pOptrExplain); typedef int32_t (*__optr_get_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain);
typedef struct STaskIdInfo { typedef struct STaskIdInfo {
uint64_t queryId; // this is also a request id uint64_t queryId; // this is also a request id
...@@ -202,21 +204,22 @@ typedef struct STaskIdInfo { ...@@ -202,21 +204,22 @@ typedef struct STaskIdInfo {
} STaskIdInfo; } STaskIdInfo;
typedef struct SExecTaskInfo { typedef struct SExecTaskInfo {
STaskIdInfo id; STaskIdInfo id;
uint32_t status; uint32_t status;
STimeWindow window; STimeWindow window;
STaskCostInfo cost; STaskCostInfo cost;
int64_t owner; // if it is in execution int64_t owner; // if it is in execution
int32_t code; int32_t code;
uint64_t totalRows; // total number of rows uint64_t totalRows; // total number of rows
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
char* sql; // query sql string char* sql; // query sql string
jmp_buf env; // jump to this position when error happens. jmp_buf env; // jump to this position when error happens.
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
struct SOperatorInfo* pRoot; struct SOperatorInfo* pRoot;
} SExecTaskInfo; } SExecTaskInfo;
typedef struct STaskRuntimeEnv { typedef struct STaskRuntimeEnv {
jmp_buf env; jmp_buf env;
STaskAttr* pQueryAttr; STaskAttr* pQueryAttr;
uint32_t status; // query status uint32_t status; // query status
...@@ -230,17 +233,17 @@ typedef struct STaskRuntimeEnv { ...@@ -230,17 +233,17 @@ typedef struct STaskRuntimeEnv {
SArray* pResultRowArrayList; // The array list that contains the Result rows SArray* pResultRowArrayList; // The array list that contains the Result rows
char* keyBuf; // window key buffer char* keyBuf; // window key buffer
// The window result objects pool, all the resultRow Objects are allocated and managed by this object. // The window result objects pool, all the resultRow Objects are allocated and managed by this object.
char** prevRow; char** prevRow;
SArray* prevResult; // intermediate result, SArray<SInterResult> SArray* prevResult; // intermediate result, SArray<SInterResult>
STSBuf* pTsBuf; // timestamp filter list STSBuf* pTsBuf; // timestamp filter list
STSCursor cur; STSCursor cur;
char* tagVal; // tag value of current data block char* tagVal; // tag value of current data block
struct SScalarFunctionSupport* scalarSup; struct SScalarFunctionSupport* scalarSup;
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
struct SOperatorInfo* proot; struct SOperatorInfo* proot;
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
int64_t currentOffset; // dynamic offset value int64_t currentOffset; // dynamic offset value
STableQueryInfo* current; STableQueryInfo* current;
SResultInfo resultInfo; SResultInfo resultInfo;
...@@ -248,10 +251,10 @@ typedef struct STaskRuntimeEnv { ...@@ -248,10 +251,10 @@ typedef struct STaskRuntimeEnv {
} STaskRuntimeEnv; } STaskRuntimeEnv;
enum { enum {
OP_NOT_OPENED = 0x0, OP_NOT_OPENED = 0x0,
OP_OPENED = 0x1, OP_OPENED = 0x1,
OP_RES_TO_RETURN = 0x5, OP_RES_TO_RETURN = 0x5,
OP_EXEC_DONE = 0x9, OP_EXEC_DONE = 0x9,
}; };
typedef struct SOperatorInfo { typedef struct SOperatorInfo {
...@@ -262,7 +265,7 @@ typedef struct SOperatorInfo { ...@@ -262,7 +265,7 @@ typedef struct SOperatorInfo {
char* name; // name, used to show the query execution plan char* name; // name, used to show the query execution plan
void* info; // extension attribution void* info; // extension attribution
SExprInfo* pExpr; SExprInfo* pExpr;
STaskRuntimeEnv* pRuntimeEnv; // todo remove it STaskRuntimeEnv* pRuntimeEnv; // todo remove it
SExecTaskInfo* pTaskInfo; SExecTaskInfo* pTaskInfo;
SOperatorCostInfo cost; SOperatorCostInfo cost;
SResultInfo resultInfo; SResultInfo resultInfo;
...@@ -270,8 +273,8 @@ typedef struct SOperatorInfo { ...@@ -270,8 +273,8 @@ typedef struct SOperatorInfo {
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
__optr_open_fn_t _openFn; // DO NOT invoke this function directly __optr_open_fn_t _openFn; // DO NOT invoke this function directly
__optr_fn_t getNextFn; __optr_fn_t getNextFn;
__optr_fn_t getStreamResFn; // execute the aggregate in the stream model. __optr_fn_t getStreamResFn; // execute the aggregate in the stream model.
__optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP
__optr_close_fn_t closeFn; __optr_close_fn_t closeFn;
__optr_encode_fn_t encodeResultRow; __optr_encode_fn_t encodeResultRow;
__optr_decode_fn_t decodeResultRow; __optr_decode_fn_t decodeResultRow;
...@@ -286,33 +289,33 @@ typedef struct { ...@@ -286,33 +289,33 @@ typedef struct {
typedef enum { typedef enum {
EX_SOURCE_DATA_NOT_READY = 0x1, EX_SOURCE_DATA_NOT_READY = 0x1,
EX_SOURCE_DATA_READY = 0x2, EX_SOURCE_DATA_READY = 0x2,
EX_SOURCE_DATA_EXHAUSTED = 0x3, EX_SOURCE_DATA_EXHAUSTED = 0x3,
} EX_SOURCE_STATUS; } EX_SOURCE_STATUS;
typedef struct SSourceDataInfo { typedef struct SSourceDataInfo {
struct SExchangeInfo *pEx; struct SExchangeInfo* pEx;
int32_t index; int32_t index;
SRetrieveTableRsp *pRsp; SRetrieveTableRsp* pRsp;
uint64_t totalRows; uint64_t totalRows;
int32_t code; int32_t code;
EX_SOURCE_STATUS status; EX_SOURCE_STATUS status;
} SSourceDataInfo; } SSourceDataInfo;
typedef struct SLoadRemoteDataInfo { typedef struct SLoadRemoteDataInfo {
uint64_t totalSize; // total load bytes from remote uint64_t totalSize; // total load bytes from remote
uint64_t totalRows; // total number of rows uint64_t totalRows; // total number of rows
uint64_t totalElapsed; // total elapsed time uint64_t totalElapsed; // total elapsed time
} SLoadRemoteDataInfo; } SLoadRemoteDataInfo;
typedef struct SExchangeInfo { typedef struct SExchangeInfo {
SArray* pSources; SArray* pSources;
SArray* pSourceDataInfo; SArray* pSourceDataInfo;
tsem_t ready; tsem_t ready;
void* pTransporter; void* pTransporter;
SSDataBlock* pResult; SSDataBlock* pResult;
bool seqLoadData; // sequential load data or not, false by default bool seqLoadData; // sequential load data or not, false by default
int32_t current; int32_t current;
SLoadRemoteDataInfo loadInfo; SLoadRemoteDataInfo loadInfo;
} SExchangeInfo; } SExchangeInfo;
...@@ -333,7 +336,7 @@ typedef struct STableScanInfo { ...@@ -333,7 +336,7 @@ typedef struct STableScanInfo {
int32_t current; int32_t current;
int32_t reverseTimes; // 0 by default int32_t reverseTimes; // 0 by default
SNode* pFilterNode; // filter operator info SNode* pFilterNode; // filter operator info
SqlFunctionCtx* pCtx; // next operator query context SqlFunctionCtx* pCtx; // next operator query context
SResultRowInfo* pResultRowInfo; SResultRowInfo* pResultRowInfo;
int32_t* rowCellInfoOffset; int32_t* rowCellInfoOffset;
SExprInfo* pExpr; SExprInfo* pExpr;
...@@ -342,6 +345,7 @@ typedef struct STableScanInfo { ...@@ -342,6 +345,7 @@ typedef struct STableScanInfo {
int32_t numOfOutput; int32_t numOfOutput;
int64_t elapsedTime; int64_t elapsedTime;
int32_t prevGroupId; // previous table group id int32_t prevGroupId; // previous table group id
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag; int32_t dataBlockLoadFlag;
} STableScanInfo; } STableScanInfo;
...@@ -354,15 +358,15 @@ typedef struct STagScanInfo { ...@@ -354,15 +358,15 @@ typedef struct STagScanInfo {
} STagScanInfo; } STagScanInfo;
typedef struct SStreamBlockScanInfo { typedef struct SStreamBlockScanInfo {
SArray* pBlockLists; // multiple SSDatablock. SArray* pBlockLists; // multiple SSDatablock.
SSDataBlock* pRes; // result SSDataBlock SSDataBlock* pRes; // result SSDataBlock
int32_t blockType; // current block type int32_t blockType; // current block type
int32_t validBlockIndex; // Is current data has returned? int32_t validBlockIndex; // Is current data has returned?
SColumnInfo* pCols; // the output column info SColumnInfo* pCols; // the output column info
uint64_t numOfRows; // total scanned rows uint64_t numOfRows; // total scanned rows
uint64_t numOfExec; // execution times uint64_t numOfExec; // execution times
void* readerHandle; // stream block reader handle void* readerHandle; // stream block reader handle
SArray* pColMatchInfo; // SArray* pColMatchInfo; //
} SStreamBlockScanInfo; } SStreamBlockScanInfo;
typedef struct SSysTableScanInfo { typedef struct SSysTableScanInfo {
...@@ -371,83 +375,83 @@ typedef struct SSysTableScanInfo { ...@@ -371,83 +375,83 @@ typedef struct SSysTableScanInfo {
void* readHandle; void* readHandle;
}; };
SRetrieveMetaTableRsp *pRsp; SRetrieveMetaTableRsp* pRsp;
SRetrieveTableReq req; SRetrieveTableReq req;
SEpSet epSet; SEpSet epSet;
tsem_t ready; tsem_t ready;
int32_t accountId; int32_t accountId;
bool showRewrite; bool showRewrite;
SNode *pCondition; // db_name filter condition, to discard data that are not in current database SNode* pCondition; // db_name filter condition, to discard data that are not in current database
void *pCur; // cursor for iterate the local table meta store. void* pCur; // cursor for iterate the local table meta store.
SArray *scanCols; // SArray<int16_t> scan column id list SArray* scanCols; // SArray<int16_t> scan column id list
int32_t type; // show type, TODO remove it int32_t type; // show type, TODO remove it
SName name; SName name;
SSDataBlock *pRes; SSDataBlock* pRes;
int32_t capacity; int32_t capacity;
int64_t numOfBlocks; // extract basic running information. int64_t numOfBlocks; // extract basic running information.
SLoadRemoteDataInfo loadInfo; SLoadRemoteDataInfo loadInfo;
} SSysTableScanInfo; } SSysTableScanInfo;
typedef struct SOptrBasicInfo { typedef struct SOptrBasicInfo {
SResultRowInfo resultRowInfo; SResultRowInfo resultRowInfo;
int32_t* rowCellInfoOffset; // offset value for each row result cell info int32_t* rowCellInfoOffset; // offset value for each row result cell info
SqlFunctionCtx* pCtx; SqlFunctionCtx* pCtx;
SSDataBlock* pRes; SSDataBlock* pRes;
int32_t capacity; // TODO remove it int32_t capacity; // TODO remove it
} SOptrBasicInfo; } SOptrBasicInfo;
//TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset // TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset
typedef struct SAggSupporter { typedef struct SAggSupporter {
SHashObj* pResultRowHashTable; // quick locate the window object for each result SHashObj* pResultRowHashTable; // quick locate the window object for each result
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
SArray* pResultRowArrayList; // The array list that contains the Result rows SArray* pResultRowArrayList; // The array list that contains the Result rows
char* keyBuf; // window key buffer char* keyBuf; // window key buffer
SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
} SAggSupporter; } SAggSupporter;
typedef struct STableIntervalOperatorInfo { typedef struct STableIntervalOperatorInfo {
SOptrBasicInfo binfo; // basic info SOptrBasicInfo binfo; // basic info
SGroupResInfo groupResInfo; // multiple results build supporter SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info SInterval interval; // interval info
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator. int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
STimeWindow win; // query time range STimeWindow win; // query time range
bool timeWindowInterpo; // interpolation needed or not bool timeWindowInterpo; // interpolation needed or not
char **pRow; // previous row/tuple of already processed datablock char** pRow; // previous row/tuple of already processed datablock
SAggSupporter aggSup; // aggregate supporter SAggSupporter aggSup; // aggregate supporter
STableQueryInfo *pCurrent; // current tableQueryInfo struct STableQueryInfo* pCurrent; // current tableQueryInfo struct
int32_t order; // current SSDataBlock scan order int32_t order; // current SSDataBlock scan order
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
SArray *pUpdatedWindow; // updated time window due to the input data block from the downstream operator. SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
SColumnInfoData timeWindowData; // query time window info for scalar function execution. SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} STableIntervalOperatorInfo; } STableIntervalOperatorInfo;
typedef struct SAggOperatorInfo { typedef struct SAggOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
SAggSupporter aggSup; SAggSupporter aggSup;
STableQueryInfo *current; STableQueryInfo* current;
uint32_t groupId; uint32_t groupId;
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
STableQueryInfo *pTableQueryInfo; STableQueryInfo* pTableQueryInfo;
} SAggOperatorInfo; } SAggOperatorInfo;
typedef struct SProjectOperatorInfo { typedef struct SProjectOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SAggSupporter aggSup; SAggSupporter aggSup;
SSDataBlock *existDataBlock; SSDataBlock* existDataBlock;
SArray *pPseudoColInfo; SArray* pPseudoColInfo;
SLimit limit; SLimit limit;
SLimit slimit; SLimit slimit;
uint64_t groupId; uint64_t groupId;
int64_t curSOffset; int64_t curSOffset;
int64_t curGroupOutput; int64_t curGroupOutput;
int64_t curOffset; int64_t curOffset;
int64_t curOutput; int64_t curOutput;
} SProjectOperatorInfo; } SProjectOperatorInfo;
typedef struct SFillOperatorInfo { typedef struct SFillOperatorInfo {
...@@ -462,86 +466,87 @@ typedef struct SFillOperatorInfo { ...@@ -462,86 +466,87 @@ typedef struct SFillOperatorInfo {
} SFillOperatorInfo; } SFillOperatorInfo;
typedef struct { typedef struct {
char *pData; char* pData;
bool isNull; bool isNull;
int16_t type; int16_t type;
int32_t bytes; int32_t bytes;
} SGroupKeys, SStateKeys; } SGroupKeys, SStateKeys;
typedef struct SGroupbyOperatorInfo { typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SArray* pGroupCols; SArray* pGroupCols;
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys> SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
SNode* pCondition; SNode* pCondition;
bool isInit; // denote if current val is initialized or not bool isInit; // denote if current val is initialized or not
char* keyBuf; // group by keys for hash char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width int32_t groupKeyLen; // total group by column width
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
SAggSupporter aggSup; SAggSupporter aggSup;
SExprInfo* pScalarExprInfo; SExprInfo* pScalarExprInfo;
int32_t numOfScalarExpr;// the number of scalar expression in group operator int32_t numOfScalarExpr; // the number of scalar expression in group operator
SqlFunctionCtx*pScalarFuncCtx; SqlFunctionCtx* pScalarFuncCtx;
} SGroupbyOperatorInfo; } SGroupbyOperatorInfo;
typedef struct SDataGroupInfo { typedef struct SDataGroupInfo {
uint64_t groupId; uint64_t groupId;
int64_t numOfRows; int64_t numOfRows;
SArray *pPageList; SArray* pPageList;
} SDataGroupInfo; } SDataGroupInfo;
// The sort in partition may be needed later. // The sort in partition may be needed later.
typedef struct SPartitionOperatorInfo { typedef struct SPartitionOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SArray* pGroupCols; SArray* pGroupCols;
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys> SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
char* keyBuf; // group by keys for hash char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width int32_t groupKeyLen; // total group by column width
SHashObj* pGroupSet; // quick locate the window object for each result SHashObj* pGroupSet; // quick locate the window object for each result
SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file SDiskbasedBuf* pBuf; // query result buffer based on blocked-wised disk file
int32_t rowCapacity; // maximum number of rows for each buffer page int32_t rowCapacity; // maximum number of rows for each buffer page
int32_t* columnOffset; // start position for each column data int32_t* columnOffset; // start position for each column data
void* pGroupIter; // group iterator void* pGroupIter; // group iterator
int32_t pageIndex; // page index of current group int32_t pageIndex; // page index of current group
} SPartitionOperatorInfo; } SPartitionOperatorInfo;
typedef struct SWindowRowsSup { typedef struct SWindowRowsSup {
STimeWindow win; STimeWindow win;
TSKEY prevTs; TSKEY prevTs;
int32_t startRowIndex; int32_t startRowIndex;
int32_t numOfRows; int32_t numOfRows;
} SWindowRowsSup; } SWindowRowsSup;
typedef struct SSessionAggOperatorInfo { typedef struct SSessionAggOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SAggSupporter aggSup; SAggSupporter aggSup;
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
SWindowRowsSup winSup; SWindowRowsSup winSup;
bool reptScan; // next round scan bool reptScan; // next round scan
int64_t gap; // session window gap int64_t gap; // session window gap
SColumnInfoData timeWindowData; // query time window info for scalar function execution. SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} SSessionAggOperatorInfo; } SSessionAggOperatorInfo;
typedef struct STimeSliceOperatorInfo { typedef struct STimeSliceOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SInterval interval; SInterval interval;
SGroupResInfo groupResInfo; // multiple results build supporter SGroupResInfo groupResInfo; // multiple results build supporter
} STimeSliceOperatorInfo; } STimeSliceOperatorInfo;
typedef struct SStateWindowOperatorInfo { typedef struct SStateWindowOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SAggSupporter aggSup; SAggSupporter aggSup;
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
SWindowRowsSup winSup; SWindowRowsSup winSup;
int32_t colIndex; // start row index int32_t colIndex; // start row index
bool hasKey; bool hasKey;
SStateKeys stateKey; SStateKeys stateKey;
SColumnInfoData timeWindowData; // query time window info for scalar function execution. SColumnInfoData timeWindowData; // query time window info for scalar function execution.
// bool reptScan; // bool reptScan;
} SStateWindowOperatorInfo; } SStateWindowOperatorInfo;
typedef struct SSortedMergeOperatorInfo { typedef struct SSortedMergeOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SArray* pSortInfo; SArray* pSortInfo;
int32_t numOfSources; int32_t numOfSources;
...@@ -558,22 +563,26 @@ typedef struct SSortedMergeOperatorInfo { ...@@ -558,22 +563,26 @@ typedef struct SSortedMergeOperatorInfo {
} SSortedMergeOperatorInfo; } SSortedMergeOperatorInfo;
typedef struct SSortOperatorInfo { typedef struct SSortOperatorInfo {
uint32_t sortBufSize; // max buffer size for in-memory sort uint32_t sortBufSize; // max buffer size for in-memory sort
SSDataBlock *pDataBlock; SSDataBlock* pDataBlock;
SArray* pSortInfo; SArray* pSortInfo;
SSortHandle *pSortHandle; SSortHandle* pSortHandle;
SArray* inputSlotMap; // for index map from table scan output SArray* inputSlotMap; // for index map from table scan output
int32_t bufPageSize; int32_t bufPageSize;
int32_t numOfRowsInRes; int32_t numOfRowsInRes;
// TODO extact struct // TODO extact struct
int64_t startTs; // sort start time int64_t startTs; // sort start time
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
uint64_t totalSize; // total load bytes from remote uint64_t totalSize; // total load bytes from remote
uint64_t totalRows; // total number of rows uint64_t totalRows; // total number of rows
uint64_t totalElapsed; // total elapsed time uint64_t totalElapsed; // total elapsed time
} SSortOperatorInfo; } SSortOperatorInfo;
typedef struct STagFilterOperatorInfo {
SOptrBasicInfo binfo;
} STagFilterOperatorInfo;
typedef struct SJoinOperatorInfo { typedef struct SJoinOperatorInfo {
SSDataBlock *pRes; SSDataBlock *pRes;
int32_t joinType; int32_t joinType;
...@@ -593,23 +602,28 @@ typedef struct SJoinOperatorInfo { ...@@ -593,23 +602,28 @@ typedef struct SJoinOperatorInfo {
} SJoinOperatorInfo; } SJoinOperatorInfo;
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
void operatorDummyCloseFn(void* param, int32_t numOfCols); void operatorDummyCloseFn(void* param, int32_t numOfCols);
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
int32_t numOfRows, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey); int32_t numOfRows, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, int32_t* rowCellOffset); void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo,
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); SDiskbasedBuf* pBuf, int32_t* rowCellOffset);
void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf,
int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup); SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput); void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order);
char* pData, int32_t compLen, int32_t numOfOutput, int64_t startTs, int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes,
uint64_t* total, SArray* pColList); int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup);
void doSetOperatorCompleted(SOperatorInfo* pOperator); void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock); int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
SArray* pColList);
void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset); SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t dataLoadFlag, int32_t repeatTime, SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t dataLoadFlag, int32_t repeatTime,
int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SExecTaskInfo* pTaskInfo); int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
...@@ -620,23 +634,34 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p ...@@ -620,23 +634,34 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pIndexMap, SExecTaskInfo* pTaskInfo); SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pIndexMap, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId); SNode* pCondition, SEpSet epset, SArray* colList,
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlot,
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SSDataBlock* pResBlock, int64_t gap, SExecTaskInfo* pTaskInfo);
SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo,
const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList, SArray* pTableIdList, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pResBlock, SArray* pColList,
SArray* pTableIdList, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo); SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal,
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); bool multigroupResult, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo,
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo); const STableGroupInfo* pTableGroupInfo);
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo); SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
...@@ -649,7 +674,8 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun ...@@ -649,7 +674,8 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRun
SOperatorInfo* createTagScanOperatorInfo(SReaderHandle* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTagScanOperatorInfo(SReaderHandle* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput);
#endif #endif
void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList); void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
int32_t numOfOutput, SArray* pPseudoList);
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
...@@ -661,23 +687,27 @@ STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow ...@@ -661,23 +687,27 @@ STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow
bool isTaskKilled(SExecTaskInfo* pTaskInfo); bool isTaskKilled(SExecTaskInfo* pTaskInfo);
int32_t checkForQueryBuf(size_t numOfTables); int32_t checkForQueryBuf(size_t numOfTables);
void setTaskKilled(SExecTaskInfo* pTaskInfo); void setTaskKilled(SExecTaskInfo* pTaskInfo);
void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType); void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType);
void publishQueryAbortEvent(SExecTaskInfo* pTaskInfo, int32_t code); void publishQueryAbortEvent(SExecTaskInfo* pTaskInfo, int32_t code);
void queryCostStatis(SExecTaskInfo* pTaskInfo); void queryCostStatis(SExecTaskInfo* pTaskInfo);
void doDestroyTask(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo);
int32_t getMaximumIdleDurationSec(); int32_t getMaximumIdleDurationSec();
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type); void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model); int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
int32_t getOperatorExplainExecInfo(SOperatorInfo *operatorInfo, SExplainExecInfo **pRes, int32_t *capacity, int32_t *resNum); EOPTR_EXEC_MODEL model);
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity,
bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char* result, int32_t length); int32_t* resNum);
void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char **result, int32_t *length);
bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char* result,
int32_t length);
void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char** result,
int32_t* length);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "filter.h"
#include "tglobal.h"
typedef enum { SFLT_NOT_INDEX, SFLT_COARSE_INDEX, SFLT_ACCURATE_INDEX } SIdxFltStatus;
SIdxFltStatus idxGetFltStatus(SNode *pFilterNode);
// construct tag filter operator later
int32_t doFilterTag(const SNode *pFilterNode, SArray *resutl);
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "indexoperator.h"
#include "executorimpl.h"
#include "nodes.h"
typedef struct SIFCtx {
int32_t code;
SHashObj *pRes; /* element is SScalarParam */
} SIFCtx;
typedef struct SIFParam {
SArray * result;
SHashObj *pFilter;
} SIFParam;
// construct tag filter operator later
static void destroyTagFilterOperatorInfo(void *param) {
STagFilterOperatorInfo *pInfo = (STagFilterOperatorInfo *)param;
}
static void sifFreeParam(SIFParam *param) {
if (param == NULL) return;
taosArrayDestroy(param->result);
}
int32_t sifInitOperParams(SIFParam *params, SOperatorNode *node, SIFCtx *ctx) {
int32_t code = 0;
return code;
}
static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *output) {
qError("index-filter not support buildin function");
return TSDB_CODE_SUCCESS;
}
static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
SIFParam *params = NULL;
return TSDB_CODE_SUCCESS;
}
static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *output) { return TSDB_CODE_SUCCESS; }
static EDealRes sifWalkFunction(SNode *pNode, void *context) {
// impl later
SFunctionNode *node = (SFunctionNode *)pNode;
SIFParam output = {0};
SIFCtx *ctx = context;
ctx->code = sifExecFunction(node, ctx, &output);
if (ctx->code != TSDB_CODE_SUCCESS) {
return DEAL_RES_ERROR;
}
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
return DEAL_RES_CONTINUE;
}
static EDealRes sifWalkLogic(SNode *pNode, void *context) {
SLogicConditionNode *node = (SLogicConditionNode *)pNode;
SIFParam output = {0};
SIFCtx *ctx = context;
ctx->code = sifExecLogic(node, ctx, &output);
if (ctx->code) {
return DEAL_RES_ERROR;
}
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
return DEAL_RES_CONTINUE;
}
static EDealRes sifWalkOper(SNode *pNode, void *context) {
SOperatorNode *node = (SOperatorNode *)pNode;
SIFParam output = {0};
SIFCtx *ctx = context;
ctx->code = sifExecOper(node, ctx, &output);
if (ctx->code) {
return DEAL_RES_ERROR;
}
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
return DEAL_RES_CONTINUE;
}
EDealRes sifCalcWalker(SNode *node, void *context) {
if (QUERY_NODE_VALUE == nodeType(node) || QUERY_NODE_NODE_LIST == nodeType(node) ||
QUERY_NODE_COLUMN == nodeType(node)) {
return DEAL_RES_CONTINUE;
}
SIFCtx *ctx = (SIFCtx *)context;
if (QUERY_NODE_FUNCTION == nodeType(node)) {
return sifWalkFunction(node, ctx);
}
if (QUERY_NODE_LOGIC_CONDITION == nodeType(node)) {
return sifWalkLogic(node, ctx);
}
if (QUERY_NODE_OPERATOR == nodeType(node)) {
return sifWalkOper(node, ctx);
}
qError("invalid node type for index filter calculating, type:%d", nodeType(node));
ctx->code = TSDB_CODE_QRY_INVALID_INPUT;
return DEAL_RES_ERROR;
}
void sifFreeRes(SHashObj *res) {
void *pIter = taosHashIterate(res, NULL);
while (pIter) {
SIFParam *p = pIter;
if (p) {
sifFreeParam(p);
}
pIter = taosHashIterate(res, pIter);
}
taosHashCleanup(res);
}
static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
if (pNode == NULL || pDst == NULL) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
int32_t code = 0;
SIFCtx ctx = {.code = 0};
ctx.pRes = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (NULL == ctx.pRes) {
qError("index-filter failed to taosHashInit");
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
nodesWalkExprPostOrder(pNode, sifCalcWalker, &ctx);
if (ctx.code != TSDB_CODE_SUCCESS) {
return ctx.code;
}
if (pDst) {
SIFParam *res = (SIFParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES);
if (res == NULL) {
qError("no valid res in hash, node:(%p), type(%d)", (void *)&pNode, nodeType(pNode));
return TSDB_CODE_QRY_APP_ERROR;
}
taosArrayAddAll(pDst->result, res->result);
sifFreeParam(res);
taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
}
return TSDB_CODE_SUCCESS;
}
int32_t doFilterTag(const SNode *pFilterNode, SArray *result) {
if (pFilterNode == NULL) {
return TSDB_CODE_SUCCESS;
}
SFilterInfo *filter = NULL;
// todo move to the initialization function
int32_t code = filterInitFromNode((SNode *)pFilterNode, &filter, 0);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
SIFParam param = {0};
code = sifCalculate((SNode *)pFilterNode, &param);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
taosArrayAddAll(result, param.result);
sifFreeParam(&param);
return code;
}
SIdxFltStatus idxGetFltStatus(SNode *pFilterNode) {
if (pFilterNode == NULL) {
return SFLT_NOT_INDEX;
}
// impl later
return SFLT_ACCURATE_INDEX;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册