Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
46ec8300
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
46ec8300
编写于
4月 14, 2022
作者:
dengyihao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh: add tag filter operator
上级
585942ce
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
357 addition
and
271 deletion
+357
-271
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+305
-271
source/libs/executor/inc/indexoperator.h
source/libs/executor/inc/indexoperator.h
+20
-0
source/libs/executor/src/indexoperator.c
source/libs/executor/src/indexoperator.c
+32
-0
未找到文件。
source/libs/executor/inc/executorimpl.h
浏览文件 @
46ec8300
...
...
@@ -20,9 +20,9 @@ extern "C" {
#endif
#include "os.h"
#include "tsort.h"
#include "tcommon.h"
#include "tlosertree.h"
#include "tsort.h"
#include "ttszip.h"
#include "tvariant.h"
...
...
@@ -35,15 +35,15 @@ extern "C" {
#include "tarray.h"
#include "thash.h"
#include "tlockfree.h"
#include "tpagedbuf.h"
#include "tmsg.h"
#include "tpagedbuf.h"
struct
SColumnFilterElem
;
typedef
int32_t
(
*
__block_search_fn_t
)(
char
*
data
,
int32_t
num
,
int64_t
key
,
int32_t
order
);
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u)
#define IS_QUERY_KILLED(_q)
((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
#define Q_STATUS_EQUAL(p, s)
(((p) & (s)) != 0u)
#define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP)
#define GET_TABLEGROUP(q, _index) ((SArray*)taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
...
...
@@ -67,7 +67,7 @@ enum {
};
typedef
struct
SResultRowCell
{
uint64_t
groupId
;
uint64_t
groupId
;
SResultRowPosition
pos
;
}
SResultRowCell
;
...
...
@@ -75,19 +75,19 @@ typedef struct SResultRowCell {
* If the number of generated results is greater than this value,
* query query will be halt and return results to client immediate.
*/
typedef
struct
SResultInfo
{
// TODO refactor
int64_t
totalRows
;
// total generated result size in rows
int64_t
totalBytes
;
// total results in bytes.
int32_t
capacity
;
// capacity of current result output buffer
int32_t
threshold
;
// result size threshold in rows.
typedef
struct
SResultInfo
{
// TODO refactor
int64_t
totalRows
;
// total generated result size in rows
int64_t
totalBytes
;
// total results in bytes.
int32_t
capacity
;
// capacity of current result output buffer
int32_t
threshold
;
// result size threshold in rows.
}
SResultInfo
;
typedef
struct
STableQueryInfo
{
TSKEY
lastKey
;
// last check ts
uint64_t
uid
;
// table uid
int32_t
groupIndex
;
// group id in table list
// SVariant tag;
SResultRowInfo
resInfo
;
// result info
TSKEY
lastKey
;
// last check ts
uint64_t
uid
;
// table uid
int32_t
groupIndex
;
// group id in table list
// SVariant tag;
SResultRowInfo
resInfo
;
// result info
}
STableQueryInfo
;
typedef
enum
{
...
...
@@ -155,27 +155,27 @@ typedef struct SOperatorCostInfo {
// The basic query information extracted from the SQueryInfo tree to support the
// execution of query in a data node.
typedef
struct
STaskAttr
{
SLimit
limit
;
SLimit
slimit
;
bool
stableQuery
;
// super table query or not
bool
topBotQuery
;
// TODO used bitwise flag
bool
groupbyColumn
;
// denote if this is a groupby normal column query
bool
timeWindowInterpo
;
// if the time window start/end required interpolation
bool
tsCompQuery
;
// is tscomp query
bool
diffQuery
;
// is diff query
bool
pointInterpQuery
;
// point interpolation query
int32_t
havingNum
;
// having expr number
int16_t
numOfCols
;
int16_t
numOfTags
;
STimeWindow
window
;
SInterval
interval
;
int16_t
precision
;
int16_t
numOfOutput
;
int16_t
fillType
;
int32_t
resultRowSize
;
int32_t
tagLen
;
// tag value length of current query
SExprInfo
*
pExpr1
;
SLimit
limit
;
SLimit
slimit
;
bool
stableQuery
;
// super table query or not
bool
topBotQuery
;
// TODO used bitwise flag
bool
groupbyColumn
;
// denote if this is a groupby normal column query
bool
timeWindowInterpo
;
// if the time window start/end required interpolation
bool
tsCompQuery
;
// is tscomp query
bool
diffQuery
;
// is diff query
bool
pointInterpQuery
;
// point interpolation query
int32_t
havingNum
;
// having expr number
int16_t
numOfCols
;
int16_t
numOfTags
;
STimeWindow
window
;
SInterval
interval
;
int16_t
precision
;
int16_t
numOfOutput
;
int16_t
fillType
;
int32_t
resultRowSize
;
int32_t
tagLen
;
// tag value length of current query
SExprInfo
*
pExpr1
;
SColumnInfo
*
tagColList
;
int32_t
numOfFilterCols
;
int64_t
*
fillVal
;
...
...
@@ -188,13 +188,15 @@ struct SOperatorInfo;
struct
SAggSupporter
;
struct
SOptrBasicInfo
;
typedef
void
(
*
__optr_encode_fn_t
)(
struct
SOperatorInfo
*
pOperator
,
struct
SAggSupporter
*
pSup
,
struct
SOptrBasicInfo
*
pInfo
,
char
**
result
,
int32_t
*
length
);
typedef
bool
(
*
__optr_decode_fn_t
)(
struct
SOperatorInfo
*
pOperator
,
struct
SAggSupporter
*
pSup
,
struct
SOptrBasicInfo
*
pInfo
,
char
*
result
,
int32_t
length
);
typedef
void
(
*
__optr_encode_fn_t
)(
struct
SOperatorInfo
*
pOperator
,
struct
SAggSupporter
*
pSup
,
struct
SOptrBasicInfo
*
pInfo
,
char
**
result
,
int32_t
*
length
);
typedef
bool
(
*
__optr_decode_fn_t
)(
struct
SOperatorInfo
*
pOperator
,
struct
SAggSupporter
*
pSup
,
struct
SOptrBasicInfo
*
pInfo
,
char
*
result
,
int32_t
length
);
typedef
int32_t
(
*
__optr_open_fn_t
)(
struct
SOperatorInfo
*
pOptr
);
typedef
SSDataBlock
*
(
*
__optr_fn_t
)(
struct
SOperatorInfo
*
pOptr
,
bool
*
newgroup
);
typedef
void
(
*
__optr_close_fn_t
)(
void
*
param
,
int32_t
num
);
typedef
int32_t
(
*
__optr_get_explain_fn_t
)(
struct
SOperatorInfo
*
pOptr
,
void
**
pOptrExplain
);
typedef
int32_t
(
*
__optr_get_explain_fn_t
)(
struct
SOperatorInfo
*
pOptr
,
void
**
pOptrExplain
);
typedef
struct
STaskIdInfo
{
uint64_t
queryId
;
// this is also a request id
...
...
@@ -204,49 +206,49 @@ typedef struct STaskIdInfo {
}
STaskIdInfo
;
typedef
struct
SExecTaskInfo
{
STaskIdInfo
id
;
uint32_t
status
;
STimeWindow
window
;
STaskCostInfo
cost
;
int64_t
owner
;
// if it is in execution
int32_t
code
;
uint64_t
totalRows
;
// total number of rows
STableGroupInfo
tableqinfoGroupInfo
;
// this is a group array list, including SArray<STableQueryInfo*> structure
char
*
sql
;
// query sql string
jmp_buf
env
;
// jump to this position when error happens.
EOPTR_EXEC_MODEL
execModel
;
// operator execution model [batch model|stream model]
STaskIdInfo
id
;
uint32_t
status
;
STimeWindow
window
;
STaskCostInfo
cost
;
int64_t
owner
;
// if it is in execution
int32_t
code
;
uint64_t
totalRows
;
// total number of rows
STableGroupInfo
tableqinfoGroupInfo
;
// this is a group array list, including SArray<STableQueryInfo*> structure
char
*
sql
;
// query sql string
jmp_buf
env
;
// jump to this position when error happens.
EOPTR_EXEC_MODEL
execModel
;
// operator execution model [batch model|stream model]
struct
SOperatorInfo
*
pRoot
;
}
SExecTaskInfo
;
typedef
struct
STaskRuntimeEnv
{
jmp_buf
env
;
STaskAttr
*
pQueryAttr
;
uint32_t
status
;
// query status
void
*
qinfo
;
uint8_t
scanFlag
;
// denotes reversed scan of data or not
void
*
pTsdbReadHandle
;
int32_t
prevGroupId
;
// previous executed group id
bool
enableGroupData
;
SDiskbasedBuf
*
pResultBuf
;
// query result buffer based on blocked-wised disk file
SHashObj
*
pResultRowHashTable
;
// quick locate the window object for each result
SHashObj
*
pResultRowListSet
;
// used to check if current ResultRowInfo has ResultRow object or not
SArray
*
pResultRowArrayList
;
// The array list that contains the Result rows
char
*
keyBuf
;
// window key buffer
jmp_buf
env
;
STaskAttr
*
pQueryAttr
;
uint32_t
status
;
// query status
void
*
qinfo
;
uint8_t
scanFlag
;
// denotes reversed scan of data or not
void
*
pTsdbReadHandle
;
int32_t
prevGroupId
;
// previous executed group id
bool
enableGroupData
;
SDiskbasedBuf
*
pResultBuf
;
// query result buffer based on blocked-wised disk file
SHashObj
*
pResultRowHashTable
;
// quick locate the window object for each result
SHashObj
*
pResultRowListSet
;
// used to check if current ResultRowInfo has ResultRow object or not
SArray
*
pResultRowArrayList
;
// The array list that contains the Result rows
char
*
keyBuf
;
// window key buffer
// The window result objects pool, all the resultRow Objects are allocated and managed by this object.
char
**
prevRow
;
SArray
*
prevResult
;
// intermediate result, SArray<SInterResult>
STSBuf
*
pTsBuf
;
// timestamp filter list
STSCursor
cur
;
char
**
prevRow
;
SArray
*
prevResult
;
// intermediate result, SArray<SInterResult>
STSBuf
*
pTsBuf
;
// timestamp filter list
STSCursor
cur
;
char
*
tagVal
;
// tag value of current data block
char
*
tagVal
;
// tag value of current data block
struct
SScalarFunctionSupport
*
scalarSup
;
SSDataBlock
*
outputBuf
;
STableGroupInfo
tableqinfoGroupInfo
;
// this is a group array list, including SArray<STableQueryInfo*> structure
struct
SOperatorInfo
*
proot
;
SGroupResInfo
groupResInfo
;
int64_t
currentOffset
;
// dynamic offset value
SGroupResInfo
groupResInfo
;
int64_t
currentOffset
;
// dynamic offset value
STableQueryInfo
*
current
;
SResultInfo
resultInfo
;
...
...
@@ -255,10 +257,10 @@ typedef struct STaskRuntimeEnv {
}
STaskRuntimeEnv
;
enum
{
OP_NOT_OPENED
=
0x0
,
OP_OPENED
=
0x1
,
OP_NOT_OPENED
=
0x0
,
OP_OPENED
=
0x1
,
OP_RES_TO_RETURN
=
0x5
,
OP_EXEC_DONE
=
0x9
,
OP_EXEC_DONE
=
0x9
,
};
typedef
struct
SOperatorInfo
{
...
...
@@ -269,7 +271,7 @@ typedef struct SOperatorInfo {
char
*
name
;
// name, used to show the query execution plan
void
*
info
;
// extension attribution
SExprInfo
*
pExpr
;
STaskRuntimeEnv
*
pRuntimeEnv
;
// todo remove it
STaskRuntimeEnv
*
pRuntimeEnv
;
// todo remove it
SExecTaskInfo
*
pTaskInfo
;
SOperatorCostInfo
cost
;
SResultInfo
resultInfo
;
...
...
@@ -277,8 +279,8 @@ typedef struct SOperatorInfo {
int32_t
numOfDownstream
;
// number of downstream. The value is always ONE expect for join operator
__optr_open_fn_t
_openFn
;
// DO NOT invoke this function directly
__optr_fn_t
getNextFn
;
__optr_fn_t
getStreamResFn
;
// execute the aggregate in the stream model.
__optr_fn_t
cleanupFn
;
// call this function to release the allocated resources ASAP
__optr_fn_t
getStreamResFn
;
// execute the aggregate in the stream model.
__optr_fn_t
cleanupFn
;
// call this function to release the allocated resources ASAP
__optr_close_fn_t
closeFn
;
__optr_encode_fn_t
encodeResultRow
;
__optr_decode_fn_t
decodeResultRow
;
...
...
@@ -293,33 +295,33 @@ typedef struct {
typedef
enum
{
EX_SOURCE_DATA_NOT_READY
=
0x1
,
EX_SOURCE_DATA_READY
=
0x2
,
EX_SOURCE_DATA_READY
=
0x2
,
EX_SOURCE_DATA_EXHAUSTED
=
0x3
,
}
EX_SOURCE_STATUS
;
typedef
struct
SSourceDataInfo
{
struct
SExchangeInfo
*
pEx
;
struct
SExchangeInfo
*
pEx
;
int32_t
index
;
SRetrieveTableRsp
*
pRsp
;
SRetrieveTableRsp
*
pRsp
;
uint64_t
totalRows
;
int32_t
code
;
EX_SOURCE_STATUS
status
;
}
SSourceDataInfo
;
typedef
struct
SLoadRemoteDataInfo
{
uint64_t
totalSize
;
// total load bytes from remote
uint64_t
totalRows
;
// total number of rows
uint64_t
totalElapsed
;
// total elapsed time
uint64_t
totalSize
;
// total load bytes from remote
uint64_t
totalRows
;
// total number of rows
uint64_t
totalElapsed
;
// total elapsed time
}
SLoadRemoteDataInfo
;
typedef
struct
SExchangeInfo
{
SArray
*
pSources
;
SArray
*
pSourceDataInfo
;
tsem_t
ready
;
void
*
pTransporter
;
SSDataBlock
*
pResult
;
bool
seqLoadData
;
// sequential load data or not, false by default
int32_t
current
;
SArray
*
pSources
;
SArray
*
pSourceDataInfo
;
tsem_t
ready
;
void
*
pTransporter
;
SSDataBlock
*
pResult
;
bool
seqLoadData
;
// sequential load data or not, false by default
int32_t
current
;
SLoadRemoteDataInfo
loadInfo
;
}
SExchangeInfo
;
...
...
@@ -340,7 +342,7 @@ typedef struct STableScanInfo {
int32_t
current
;
int32_t
reverseTimes
;
// 0 by default
SNode
*
pFilterNode
;
// filter operator info
SqlFunctionCtx
*
pCtx
;
// next operator query context
SqlFunctionCtx
*
pCtx
;
// next operator query context
SResultRowInfo
*
pResultRowInfo
;
int32_t
*
rowCellInfoOffset
;
SExprInfo
*
pExpr
;
...
...
@@ -349,7 +351,7 @@ typedef struct STableScanInfo {
int32_t
numOfOutput
;
int64_t
elapsedTime
;
int32_t
prevGroupId
;
// previous table group id
int32_t
scanFlag
;
// table scan flag to denote if it is a repeat/reverse/main scan
int32_t
scanFlag
;
// table scan flag to denote if it is a repeat/reverse/main scan
}
STableScanInfo
;
typedef
struct
STagScanInfo
{
...
...
@@ -360,15 +362,15 @@ typedef struct STagScanInfo {
}
STagScanInfo
;
typedef
struct
SStreamBlockScanInfo
{
SArray
*
pBlockLists
;
// multiple SSDatablock.
SSDataBlock
*
pRes
;
// result SSDataBlock
int32_t
blockType
;
// current block type
int32_t
validBlockIndex
;
// Is current data has returned?
SColumnInfo
*
pCols
;
// the output column info
uint64_t
numOfRows
;
// total scanned rows
uint64_t
numOfExec
;
// execution times
void
*
readerHandle
;
// stream block reader handle
SArray
*
pColMatchInfo
;
//
SArray
*
pBlockLists
;
// multiple SSDatablock.
SSDataBlock
*
pRes
;
// result SSDataBlock
int32_t
blockType
;
// current block type
int32_t
validBlockIndex
;
// Is current data has returned?
SColumnInfo
*
pCols
;
// the output column info
uint64_t
numOfRows
;
// total scanned rows
uint64_t
numOfExec
;
// execution times
void
*
readerHandle
;
// stream block reader handle
SArray
*
pColMatchInfo
;
//
}
SStreamBlockScanInfo
;
typedef
struct
SSysTableScanInfo
{
...
...
@@ -377,83 +379,83 @@ typedef struct SSysTableScanInfo {
void
*
readHandle
;
};
SRetrieveMetaTableRsp
*
pRsp
;
SRetrieveTableReq
req
;
SEpSet
epSet
;
tsem_t
ready
;
SRetrieveMetaTableRsp
*
pRsp
;
SRetrieveTableReq
req
;
SEpSet
epSet
;
tsem_t
ready
;
int32_t
accountId
;
bool
showRewrite
;
SNode
*
pCondition
;
// db_name filter condition, to discard data that are not in current database
void
*
pCur
;
// cursor for iterate the local table meta store.
SArray
*
scanCols
;
// SArray<int16_t> scan column id list
int32_t
accountId
;
bool
showRewrite
;
SNode
*
pCondition
;
// db_name filter condition, to discard data that are not in current database
void
*
pCur
;
// cursor for iterate the local table meta store.
SArray
*
scanCols
;
// SArray<int16_t> scan column id list
int32_t
type
;
// show type, TODO remove it
int32_t
type
;
// show type, TODO remove it
SName
name
;
SSDataBlock
*
pRes
;
SSDataBlock
*
pRes
;
int32_t
capacity
;
int64_t
numOfBlocks
;
// extract basic running information.
SLoadRemoteDataInfo
loadInfo
;
}
SSysTableScanInfo
;
typedef
struct
SOptrBasicInfo
{
SResultRowInfo
resultRowInfo
;
int32_t
*
rowCellInfoOffset
;
// offset value for each row result cell info
SqlFunctionCtx
*
pCtx
;
SSDataBlock
*
pRes
;
int32_t
capacity
;
// TODO remove it
SResultRowInfo
resultRowInfo
;
int32_t
*
rowCellInfoOffset
;
// offset value for each row result cell info
SqlFunctionCtx
*
pCtx
;
SSDataBlock
*
pRes
;
int32_t
capacity
;
// TODO remove it
}
SOptrBasicInfo
;
//TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset
//
TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset
typedef
struct
SAggSupporter
{
SHashObj
*
pResultRowHashTable
;
// quick locate the window object for each result
SHashObj
*
pResultRowListSet
;
// used to check if current ResultRowInfo has ResultRow object or not
SArray
*
pResultRowArrayList
;
// The array list that contains the Result rows
char
*
keyBuf
;
// window key buffer
SDiskbasedBuf
*
pResultBuf
;
// query result buffer based on blocked-wised disk file
int32_t
resultRowSize
;
// the result buffer size for each result row, with the meta data size for each row
SHashObj
*
pResultRowHashTable
;
// quick locate the window object for each result
SHashObj
*
pResultRowListSet
;
// used to check if current ResultRowInfo has ResultRow object or not
SArray
*
pResultRowArrayList
;
// The array list that contains the Result rows
char
*
keyBuf
;
// window key buffer
SDiskbasedBuf
*
pResultBuf
;
// query result buffer based on blocked-wised disk file
int32_t
resultRowSize
;
// the result buffer size for each result row, with the meta data size for each row
}
SAggSupporter
;
typedef
struct
STableIntervalOperatorInfo
{
SOptrBasicInfo
binfo
;
// basic info
SGroupResInfo
groupResInfo
;
// multiple results build supporter
SInterval
interval
;
// interval info
int32_t
primaryTsIndex
;
// primary time stamp slot id from result of downstream operator.
STimeWindow
win
;
// query time range
bool
timeWindowInterpo
;
// interpolation needed or not
char
**
pRow
;
// previous row/tuple of already processed datablock
SAggSupporter
aggSup
;
// aggregate supporter
STableQueryInfo
*
pCurrent
;
// current tableQueryInfo struct
int32_t
order
;
// current SSDataBlock scan order
EOPTR_EXEC_MODEL
execModel
;
// operator execution model [batch model|stream model]
SArray
*
pUpdatedWindow
;
// updated time window due to the input data block from the downstream operator.
SColumnInfoData
timeWindowData
;
// query time window info for scalar function execution.
SOptrBasicInfo
binfo
;
// basic info
SGroupResInfo
groupResInfo
;
// multiple results build supporter
SInterval
interval
;
// interval info
int32_t
primaryTsIndex
;
// primary time stamp slot id from result of downstream operator.
STimeWindow
win
;
// query time range
bool
timeWindowInterpo
;
// interpolation needed or not
char
**
pRow
;
// previous row/tuple of already processed datablock
SAggSupporter
aggSup
;
// aggregate supporter
STableQueryInfo
*
pCurrent
;
// current tableQueryInfo struct
int32_t
order
;
// current SSDataBlock scan order
EOPTR_EXEC_MODEL
execModel
;
// operator execution model [batch model|stream model]
SArray
*
pUpdatedWindow
;
// updated time window due to the input data block from the downstream operator.
SColumnInfoData
timeWindowData
;
// query time window info for scalar function execution.
}
STableIntervalOperatorInfo
;
typedef
struct
SAggOperatorInfo
{
SOptrBasicInfo
binfo
;
SDiskbasedBuf
*
pResultBuf
;
// query result buffer based on blocked-wised disk file
SAggSupporter
aggSup
;
STableQueryInfo
*
current
;
uint32_t
groupId
;
SGroupResInfo
groupResInfo
;
STableQueryInfo
*
pTableQueryInfo
;
SOptrBasicInfo
binfo
;
SDiskbasedBuf
*
pResultBuf
;
// query result buffer based on blocked-wised disk file
SAggSupporter
aggSup
;
STableQueryInfo
*
current
;
uint32_t
groupId
;
SGroupResInfo
groupResInfo
;
STableQueryInfo
*
pTableQueryInfo
;
}
SAggOperatorInfo
;
typedef
struct
SProjectOperatorInfo
{
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
SSDataBlock
*
existDataBlock
;
SArray
*
pPseudoColInfo
;
SSDataBlock
*
existDataBlock
;
SArray
*
pPseudoColInfo
;
SLimit
limit
;
SLimit
slimit
;
uint64_t
groupId
;
int64_t
curSOffset
;
int64_t
curGroupOutput
;
uint64_t
groupId
;
int64_t
curSOffset
;
int64_t
curGroupOutput
;
int64_t
curOffset
;
int64_t
curOutput
;
int64_t
curOffset
;
int64_t
curOutput
;
}
SProjectOperatorInfo
;
typedef
struct
SFillOperatorInfo
{
...
...
@@ -468,165 +470,192 @@ typedef struct SFillOperatorInfo {
}
SFillOperatorInfo
;
typedef
struct
{
char
*
pData
;
bool
isNull
;
int16_t
type
;
int32_t
bytes
;
char
*
pData
;
bool
isNull
;
int16_t
type
;
int32_t
bytes
;
}
SGroupKeys
,
SStateKeys
;
typedef
struct
SGroupbyOperatorInfo
{
SOptrBasicInfo
binfo
;
SArray
*
pGroupCols
;
SArray
*
pGroupColVals
;
// current group column values, SArray<SGroupKeys>
SNode
*
pCondition
;
bool
isInit
;
// denote if current val is initialized or not
char
*
keyBuf
;
// group by keys for hash
int32_t
groupKeyLen
;
// total group by column width
SGroupResInfo
groupResInfo
;
SAggSupporter
aggSup
;
SExprInfo
*
pScalarExprInfo
;
int32_t
numOfScalarExpr
;
// the number of scalar expression in group operator
SqlFunctionCtx
*
pScalarFuncCtx
;
SOptrBasicInfo
binfo
;
SArray
*
pGroupCols
;
SArray
*
pGroupColVals
;
// current group column values, SArray<SGroupKeys>
SNode
*
pCondition
;
bool
isInit
;
// denote if current val is initialized or not
char
*
keyBuf
;
// group by keys for hash
int32_t
groupKeyLen
;
// total group by column width
SGroupResInfo
groupResInfo
;
SAggSupporter
aggSup
;
SExprInfo
*
pScalarExprInfo
;
int32_t
numOfScalarExpr
;
// the number of scalar expression in group operator
SqlFunctionCtx
*
pScalarFuncCtx
;
}
SGroupbyOperatorInfo
;
typedef
struct
SDataGroupInfo
{
uint64_t
groupId
;
int64_t
numOfRows
;
SArray
*
pPageList
;
SArray
*
pPageList
;
}
SDataGroupInfo
;
// The sort in partition may be needed later.
typedef
struct
SPartitionOperatorInfo
{
SOptrBasicInfo
binfo
;
SArray
*
pGroupCols
;
SArray
*
pGroupColVals
;
// current group column values, SArray<SGroupKeys>
char
*
keyBuf
;
// group by keys for hash
int32_t
groupKeyLen
;
// total group by column width
SHashObj
*
pGroupSet
;
// quick locate the window object for each result
SDiskbasedBuf
*
pBuf
;
// query result buffer based on blocked-wised disk file
int32_t
rowCapacity
;
// maximum number of rows for each buffer page
int32_t
*
columnOffset
;
// start position for each column data
void
*
pGroupIter
;
// group iterator
int32_t
pageIndex
;
// page index of current group
SOptrBasicInfo
binfo
;
SArray
*
pGroupCols
;
SArray
*
pGroupColVals
;
// current group column values, SArray<SGroupKeys>
char
*
keyBuf
;
// group by keys for hash
int32_t
groupKeyLen
;
// total group by column width
SHashObj
*
pGroupSet
;
// quick locate the window object for each result
SDiskbasedBuf
*
pBuf
;
// query result buffer based on blocked-wised disk file
int32_t
rowCapacity
;
// maximum number of rows for each buffer page
int32_t
*
columnOffset
;
// start position for each column data
void
*
pGroupIter
;
// group iterator
int32_t
pageIndex
;
// page index of current group
}
SPartitionOperatorInfo
;
typedef
struct
SWindowRowsSup
{
STimeWindow
win
;
TSKEY
prevTs
;
int32_t
startRowIndex
;
int32_t
numOfRows
;
STimeWindow
win
;
TSKEY
prevTs
;
int32_t
startRowIndex
;
int32_t
numOfRows
;
}
SWindowRowsSup
;
typedef
struct
SSessionAggOperatorInfo
{
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
SGroupResInfo
groupResInfo
;
SWindowRowsSup
winSup
;
bool
reptScan
;
// next round scan
int64_t
gap
;
// session window gap
SColumnInfoData
timeWindowData
;
// query time window info for scalar function execution.
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
SGroupResInfo
groupResInfo
;
SWindowRowsSup
winSup
;
bool
reptScan
;
// next round scan
int64_t
gap
;
// session window gap
SColumnInfoData
timeWindowData
;
// query time window info for scalar function execution.
}
SSessionAggOperatorInfo
;
typedef
struct
STimeSliceOperatorInfo
{
SOptrBasicInfo
binfo
;
SInterval
interval
;
SGroupResInfo
groupResInfo
;
// multiple results build supporter
SOptrBasicInfo
binfo
;
SInterval
interval
;
SGroupResInfo
groupResInfo
;
// multiple results build supporter
}
STimeSliceOperatorInfo
;
typedef
struct
SStateWindowOperatorInfo
{
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
SGroupResInfo
groupResInfo
;
SWindowRowsSup
winSup
;
int32_t
colIndex
;
// start row index
bool
hasKey
;
SStateKeys
stateKey
;
SColumnInfoData
timeWindowData
;
// query time window info for scalar function execution.
// bool reptScan;
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
SGroupResInfo
groupResInfo
;
SWindowRowsSup
winSup
;
int32_t
colIndex
;
// start row index
bool
hasKey
;
SStateKeys
stateKey
;
SColumnInfoData
timeWindowData
;
// query time window info for scalar function execution.
// bool reptScan;
}
SStateWindowOperatorInfo
;
typedef
struct
SSortedMergeOperatorInfo
{
SOptrBasicInfo
binfo
;
bool
hasVarCol
;
SArray
*
pSortInfo
;
int32_t
numOfSources
;
SSortHandle
*
pSortHandle
;
int32_t
bufPageSize
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
int32_t
resultRowFactor
;
bool
hasGroupVal
;
SDiskbasedBuf
*
pTupleStore
;
// keep the final results
int32_t
numOfResPerPage
;
char
**
groupVal
;
SArray
*
groupInfo
;
SAggSupporter
aggSup
;
SOptrBasicInfo
binfo
;
bool
hasVarCol
;
SArray
*
pSortInfo
;
int32_t
numOfSources
;
SSortHandle
*
pSortHandle
;
int32_t
bufPageSize
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
int32_t
resultRowFactor
;
bool
hasGroupVal
;
SDiskbasedBuf
*
pTupleStore
;
// keep the final results
int32_t
numOfResPerPage
;
char
**
groupVal
;
SArray
*
groupInfo
;
SAggSupporter
aggSup
;
}
SSortedMergeOperatorInfo
;
typedef
struct
SSortOperatorInfo
{
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
SSDataBlock
*
pDataBlock
;
SArray
*
pSortInfo
;
SSortHandle
*
pSortHandle
;
SArray
*
inputSlotMap
;
// for index map from table scan output
int32_t
bufPageSize
;
int32_t
numOfRowsInRes
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
SSDataBlock
*
pDataBlock
;
SArray
*
pSortInfo
;
SSortHandle
*
pSortHandle
;
SArray
*
inputSlotMap
;
// for index map from table scan output
int32_t
bufPageSize
;
int32_t
numOfRowsInRes
;
// TODO extact struct
int64_t
startTs
;
// sort start time
uint64_t
sortElapsed
;
// sort elapsed time, time to flush to disk not included.
uint64_t
totalSize
;
// total load bytes from remote
uint64_t
totalRows
;
// total number of rows
uint64_t
totalElapsed
;
// total elapsed time
int64_t
startTs
;
// sort start time
uint64_t
sortElapsed
;
// sort elapsed time, time to flush to disk not included.
uint64_t
totalSize
;
// total load bytes from remote
uint64_t
totalRows
;
// total number of rows
uint64_t
totalElapsed
;
// total elapsed time
}
SSortOperatorInfo
;
typedef
struct
STagFilterOperatorInfo
{
SOptrBasicInfo
binfo
;
}
STagFilterOperatorInfo
;
int32_t
operatorDummyOpenFn
(
SOperatorInfo
*
pOperator
);
void
operatorDummyCloseFn
(
void
*
param
,
int32_t
numOfCols
);
void
operatorDummyCloseFn
(
void
*
param
,
int32_t
numOfCols
);
int32_t
appendDownstream
(
SOperatorInfo
*
p
,
SOperatorInfo
**
pDownstream
,
int32_t
num
);
int32_t
initAggInfo
(
SOptrBasicInfo
*
pBasicInfo
,
SAggSupporter
*
pAggSup
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
int32_t
numOfRows
,
SSDataBlock
*
pResultBlock
,
size_t
keyBufSize
,
const
char
*
pkey
);
void
toSDatablock
(
SSDataBlock
*
pBlock
,
int32_t
rowCapacity
,
SGroupResInfo
*
pGroupResInfo
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
,
int32_t
*
rowCellOffset
);
void
finalizeMultiTupleQueryResult
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int32_t
*
rowCellInfoOffset
);
void
doApplyFunctions
(
SqlFunctionCtx
*
pCtx
,
STimeWindow
*
pWin
,
SColumnInfoData
*
pTimeWindowData
,
int32_t
offset
,
int32_t
forwardStep
,
TSKEY
*
tsCol
,
int32_t
numOfTotal
,
int32_t
numOfOutput
,
int32_t
order
);
int32_t
setGroupResultOutputBuf
(
SOptrBasicInfo
*
binfo
,
int32_t
numOfCols
,
char
*
pData
,
int16_t
type
,
int16_t
bytes
,
int32_t
groupId
,
SDiskbasedBuf
*
pBuf
,
SExecTaskInfo
*
pTaskInfo
,
SAggSupporter
*
pAggSup
);
void
doDestroyBasicInfo
(
SOptrBasicInfo
*
pInfo
,
int32_t
numOfOutput
);
int32_t
setSDataBlockFromFetchRsp
(
SSDataBlock
*
pRes
,
SLoadRemoteDataInfo
*
pLoadInfo
,
int32_t
numOfRows
,
char
*
pData
,
int32_t
compLen
,
int32_t
numOfOutput
,
int64_t
startTs
,
uint64_t
*
total
,
SArray
*
pColList
);
void
doSetOperatorCompleted
(
SOperatorInfo
*
pOperator
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
);
void
toSDatablock
(
SSDataBlock
*
pBlock
,
int32_t
rowCapacity
,
SGroupResInfo
*
pGroupResInfo
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
,
int32_t
*
rowCellOffset
);
void
finalizeMultiTupleQueryResult
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int32_t
*
rowCellInfoOffset
);
void
doApplyFunctions
(
SqlFunctionCtx
*
pCtx
,
STimeWindow
*
pWin
,
SColumnInfoData
*
pTimeWindowData
,
int32_t
offset
,
int32_t
forwardStep
,
TSKEY
*
tsCol
,
int32_t
numOfTotal
,
int32_t
numOfOutput
,
int32_t
order
);
int32_t
setGroupResultOutputBuf
(
SOptrBasicInfo
*
binfo
,
int32_t
numOfCols
,
char
*
pData
,
int16_t
type
,
int16_t
bytes
,
int32_t
groupId
,
SDiskbasedBuf
*
pBuf
,
SExecTaskInfo
*
pTaskInfo
,
SAggSupporter
*
pAggSup
);
void
doDestroyBasicInfo
(
SOptrBasicInfo
*
pInfo
,
int32_t
numOfOutput
);
int32_t
setSDataBlockFromFetchRsp
(
SSDataBlock
*
pRes
,
SLoadRemoteDataInfo
*
pLoadInfo
,
int32_t
numOfRows
,
char
*
pData
,
int32_t
compLen
,
int32_t
numOfOutput
,
int64_t
startTs
,
uint64_t
*
total
,
SArray
*
pColList
);
void
doSetOperatorCompleted
(
SOperatorInfo
*
pOperator
);
void
doFilter
(
const
SNode
*
pFilterNode
,
SSDataBlock
*
pBlock
);
SqlFunctionCtx
*
createSqlFunctionCtx
(
SExprInfo
*
pExprInfo
,
int32_t
numOfOutput
,
int32_t
**
rowCellInfoOffset
);
SOperatorInfo
*
createExchangeOperatorInfo
(
const
SNodeList
*
pSources
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTableScanOperatorInfo
(
void
*
pTsdbReadHandle
,
int32_t
order
,
int32_t
numOfCols
,
int32_t
repeatTime
,
int32_t
reverseTime
,
SArray
*
pColMatchInfo
,
SSDataBlock
*
pResBlock
,
SNode
*
pCondition
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createAggregateOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SExecTaskInfo
*
pTaskInfo
,
const
STableGroupInfo
*
pTableGroupInfo
);
SOperatorInfo
*
createMultiTableAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SExecTaskInfo
*
pTaskInfo
,
const
STableGroupInfo
*
pTableGroupInfo
);
SOperatorInfo
*
createProjectOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
num
,
SSDataBlock
*
pResBlock
,
SLimit
*
pLimit
,
SLimit
*
pSlimit
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSortOperatorInfo
(
SOperatorInfo
*
downstream
,
SSDataBlock
*
pResBlock
,
SArray
*
pSortInfo
,
SArray
*
pIndexMap
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSortedMergeOperatorInfo
(
SOperatorInfo
**
downstream
,
int32_t
numOfDownstream
,
SExprInfo
*
pExprInfo
,
int32_t
num
,
SArray
*
pSortInfo
,
SArray
*
pGroupInfo
,
SExecTaskInfo
*
pTaskInfo
);
int32_t
reverseTime
,
SArray
*
pColMatchInfo
,
SSDataBlock
*
pResBlock
,
SNode
*
pCondition
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createAggregateOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SExecTaskInfo
*
pTaskInfo
,
const
STableGroupInfo
*
pTableGroupInfo
);
SOperatorInfo
*
createMultiTableAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SExecTaskInfo
*
pTaskInfo
,
const
STableGroupInfo
*
pTableGroupInfo
);
SOperatorInfo
*
createProjectOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
num
,
SSDataBlock
*
pResBlock
,
SLimit
*
pLimit
,
SLimit
*
pSlimit
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSortOperatorInfo
(
SOperatorInfo
*
downstream
,
SSDataBlock
*
pResBlock
,
SArray
*
pSortInfo
,
SArray
*
pIndexMap
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSortedMergeOperatorInfo
(
SOperatorInfo
**
downstream
,
int32_t
numOfDownstream
,
SExprInfo
*
pExprInfo
,
int32_t
num
,
SArray
*
pSortInfo
,
SArray
*
pGroupInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSysTableScanOperatorInfo
(
void
*
pSysTableReadHandle
,
SSDataBlock
*
pResBlock
,
const
SName
*
pName
,
SNode
*
pCondition
,
SEpSet
epset
,
SArray
*
colList
,
SExecTaskInfo
*
pTaskInfo
,
bool
showRewrite
,
int32_t
accountId
);
SOperatorInfo
*
createIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlot
,
SNode
*
pCondition
,
SEpSet
epset
,
SArray
*
colList
,
SExecTaskInfo
*
pTaskInfo
,
bool
showRewrite
,
int32_t
accountId
);
SOperatorInfo
*
createIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlot
,
const
STableGroupInfo
*
pTableGroupInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
int64_t
gap
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createGroupOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SArray
*
pGroupColList
,
SNode
*
pCondition
,
SExprInfo
*
pScalarExprInfo
,
int32_t
numOfScalarExpr
,
SExecTaskInfo
*
pTaskInfo
,
const
STableGroupInfo
*
pTableGroupInfo
);
SOperatorInfo
*
createSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
int64_t
gap
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createGroupOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SArray
*
pGroupColList
,
SNode
*
pCondition
,
SExprInfo
*
pScalarExprInfo
,
int32_t
numOfScalarExpr
,
SExecTaskInfo
*
pTaskInfo
,
const
STableGroupInfo
*
pTableGroupInfo
);
SOperatorInfo
*
createDataBlockInfoScanOperator
(
void
*
dataReader
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamScanOperatorInfo
(
void
*
streamReadHandle
,
SSDataBlock
*
pResBlock
,
SArray
*
pColList
,
SArray
*
pTableIdList
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamScanOperatorInfo
(
void
*
streamReadHandle
,
SSDataBlock
*
pResBlock
,
SArray
*
pColList
,
SArray
*
pTableIdList
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createFillOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SInterval
*
pInterval
,
SSDataBlock
*
pResBlock
,
int32_t
fillType
,
char
*
fillVal
,
bool
multigroupResult
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStatewindowOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createFillOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SInterval
*
pInterval
,
SSDataBlock
*
pResBlock
,
int32_t
fillType
,
char
*
fillVal
,
bool
multigroupResult
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStatewindowOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExpr
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createPartitionOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SArray
*
pGroupColList
,
SExecTaskInfo
*
pTaskInfo
,
const
STableGroupInfo
*
pTableGroupInfo
);
SOperatorInfo
*
createTimeSliceOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createPartitionOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SArray
*
pGroupColList
,
SExecTaskInfo
*
pTaskInfo
,
const
STableGroupInfo
*
pTableGroupInfo
);
SOperatorInfo
*
createTimeSliceOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResultBlock
,
SExecTaskInfo
*
pTaskInfo
);
#if 0
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
...
...
@@ -640,7 +669,8 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOf
int32_t numOfOutput);
#endif
void
projectApplyFunctions
(
SExprInfo
*
pExpr
,
SSDataBlock
*
pResult
,
SSDataBlock
*
pSrcBlock
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
SArray
*
pPseudoList
);
void
projectApplyFunctions
(
SExprInfo
*
pExpr
,
SSDataBlock
*
pResult
,
SSDataBlock
*
pSrcBlock
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
SArray
*
pPseudoList
);
void
setInputDataBlock
(
SOperatorInfo
*
pOperator
,
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
order
);
...
...
@@ -652,23 +682,27 @@ STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow
bool
isTaskKilled
(
SExecTaskInfo
*
pTaskInfo
);
int32_t
checkForQueryBuf
(
size_t
numOfTables
);
void
setTaskKilled
(
SExecTaskInfo
*
pTaskInfo
);
void
setTaskKilled
(
SExecTaskInfo
*
pTaskInfo
);
void
publishOperatorProfEvent
(
SOperatorInfo
*
operatorInfo
,
EQueryProfEventType
eventType
);
void
publishQueryAbortEvent
(
SExecTaskInfo
*
pTaskInfo
,
int32_t
code
);
void
queryCostStatis
(
SExecTaskInfo
*
pTaskInfo
);
void
doDestroyTask
(
SExecTaskInfo
*
pTaskInfo
);
void
doDestroyTask
(
SExecTaskInfo
*
pTaskInfo
);
int32_t
getMaximumIdleDurationSec
();
void
doInvokeUdf
(
struct
SUdfInfo
*
pUdfInfo
,
SqlFunctionCtx
*
pCtx
,
int32_t
idx
,
int32_t
type
);
void
setTaskStatus
(
SExecTaskInfo
*
pTaskInfo
,
int8_t
status
);
int32_t
createExecTaskInfoImpl
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
taskId
,
EOPTR_EXEC_MODEL
model
);
int32_t
getOperatorExplainExecInfo
(
SOperatorInfo
*
operatorInfo
,
SExplainExecInfo
**
pRes
,
int32_t
*
capacity
,
int32_t
*
resNum
);
bool
aggDecodeResultRow
(
SOperatorInfo
*
pOperator
,
SAggSupporter
*
pSup
,
SOptrBasicInfo
*
pInfo
,
char
*
result
,
int32_t
length
);
void
aggEncodeResultRow
(
SOperatorInfo
*
pOperator
,
SAggSupporter
*
pSup
,
SOptrBasicInfo
*
pInfo
,
char
**
result
,
int32_t
*
length
);
int32_t
createExecTaskInfoImpl
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
taskId
,
EOPTR_EXEC_MODEL
model
);
int32_t
getOperatorExplainExecInfo
(
SOperatorInfo
*
operatorInfo
,
SExplainExecInfo
**
pRes
,
int32_t
*
capacity
,
int32_t
*
resNum
);
bool
aggDecodeResultRow
(
SOperatorInfo
*
pOperator
,
SAggSupporter
*
pSup
,
SOptrBasicInfo
*
pInfo
,
char
*
result
,
int32_t
length
);
void
aggEncodeResultRow
(
SOperatorInfo
*
pOperator
,
SAggSupporter
*
pSup
,
SOptrBasicInfo
*
pInfo
,
char
**
result
,
int32_t
*
length
);
#ifdef __cplusplus
}
...
...
source/libs/executor/inc/indexoperator.h
0 → 100644
浏览文件 @
46ec8300
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "filter.h"
#include "tglobal.h"
// construct tag filter operator later
int32_t
doFilterTag
(
const
SNode
*
pFilterNode
,
SArray
*
resutl
);
source/libs/executor/src/indexoperator.c
0 → 100644
浏览文件 @
46ec8300
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "indexoperator.h"
#include "executorimpl.h"
// construct tag filter operator later
static
void
destroyTagFilterOperatorInfo
(
void
*
param
)
{
STagFilterOperatorInfo
*
pInfo
=
(
STagFilterOperatorInfo
*
)
param
;
}
int32_t
doFilterTag
(
const
SNode
*
pFilterNode
,
SArray
*
resutl
)
{
if
(
pFilterNode
==
NULL
)
{
return
TSDB_CODE_SUCCESS
;
}
SFilterInfo
*
filter
=
NULL
;
// todo move to the initialization function
int32_t
code
=
filterInitFromNode
((
SNode
*
)
pFilterNode
,
&
filter
,
0
);
return
code
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录