Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a0c2b347
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
a0c2b347
编写于
4月 28, 2023
作者:
H
Haojun Liao
提交者:
GitHub
4月 28, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #21119 from taosdata/fix/liaohj_main
refactor: do some internal refactor.
上级
f850388e
da0d9c78
变更
31
显示空白变更内容
内联
并排
Showing
31 changed file
with
814 addition
and
876 deletion
+814
-876
source/dnode/vnode/src/tq/tqRestore.c
source/dnode/vnode/src/tq/tqRestore.c
+1
-1
source/dnode/vnode/src/tq/tqScan.c
source/dnode/vnode/src/tq/tqScan.c
+1
-0
source/libs/executor/inc/executorInt.h
source/libs/executor/inc/executorInt.h
+640
-4
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+0
-699
source/libs/executor/inc/operator.h
source/libs/executor/inc/operator.h
+1
-0
source/libs/executor/inc/querytask.h
source/libs/executor/inc/querytask.h
+29
-0
source/libs/executor/src/aggregateoperator.c
source/libs/executor/src/aggregateoperator.c
+5
-5
source/libs/executor/src/cachescanoperator.c
source/libs/executor/src/cachescanoperator.c
+3
-3
source/libs/executor/src/dataDeleter.c
source/libs/executor/src/dataDeleter.c
+1
-1
source/libs/executor/src/dataDispatcher.c
source/libs/executor/src/dataDispatcher.c
+1
-1
source/libs/executor/src/dataInserter.c
source/libs/executor/src/dataInserter.c
+1
-1
source/libs/executor/src/eventwindowoperator.c
source/libs/executor/src/eventwindowoperator.c
+3
-3
source/libs/executor/src/exchangeoperator.c
source/libs/executor/src/exchangeoperator.c
+8
-8
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+2
-2
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+25
-5
source/libs/executor/src/executorInt.c
source/libs/executor/src/executorInt.c
+3
-56
source/libs/executor/src/filloperator.c
source/libs/executor/src/filloperator.c
+1
-1
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+2
-3
source/libs/executor/src/joinoperator.c
source/libs/executor/src/joinoperator.c
+3
-3
source/libs/executor/src/operator.c
source/libs/executor/src/operator.c
+59
-61
source/libs/executor/src/projectoperator.c
source/libs/executor/src/projectoperator.c
+1
-1
source/libs/executor/src/querytask.c
source/libs/executor/src/querytask.c
+3
-3
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+1
-1
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+2
-2
source/libs/executor/src/sysscanoperator.c
source/libs/executor/src/sysscanoperator.c
+1
-1
source/libs/executor/src/tfill.c
source/libs/executor/src/tfill.c
+1
-1
source/libs/executor/src/timesliceoperator.c
source/libs/executor/src/timesliceoperator.c
+3
-3
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+9
-3
source/libs/executor/test/executorTests.cpp
source/libs/executor/test/executorTests.cpp
+2
-2
source/libs/executor/test/lhashTests.cpp
source/libs/executor/test/lhashTests.cpp
+1
-1
source/libs/executor/test/sortTests.cpp
source/libs/executor/test/sortTests.cpp
+1
-1
未找到文件。
source/dnode/vnode/src/tq/tqRestore.c
浏览文件 @
a0c2b347
...
@@ -107,7 +107,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
...
@@ -107,7 +107,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
if
(
streamTaskShouldStop
(
&
pTask
->
status
)
||
status
==
TASK_STATUS__RECOVER_PREPARE
||
if
(
streamTaskShouldStop
(
&
pTask
->
status
)
||
status
==
TASK_STATUS__RECOVER_PREPARE
||
status
==
TASK_STATUS__WAIT_DOWNSTREAM
)
{
status
==
TASK_STATUS__WAIT_DOWNSTREAM
)
{
tqDebug
(
"s-task:%s skip push data, not ready for processing, status
%d"
,
pTask
->
id
.
idStr
,
status
);
tqDebug
(
"s-task:%s skip push data, not ready for processing, status
:
%d"
,
pTask
->
id
.
idStr
,
status
);
streamMetaReleaseTask
(
pStreamMeta
,
pTask
);
streamMetaReleaseTask
(
pStreamMeta
,
pTask
);
continue
;
continue
;
}
}
...
...
source/dnode/vnode/src/tq/tqScan.c
浏览文件 @
a0c2b347
...
@@ -130,6 +130,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
...
@@ -130,6 +130,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
tqError
(
"vgId:%d, task exec error since %s"
,
pTq
->
pVnode
->
config
.
vgId
,
terrstr
());
tqError
(
"vgId:%d, task exec error since %s"
,
pTq
->
pVnode
->
config
.
vgId
,
terrstr
());
return
-
1
;
return
-
1
;
}
}
tqDebug
(
"tmqsnap task execute end, get %p"
,
pDataBlock
);
tqDebug
(
"tmqsnap task execute end, get %p"
,
pDataBlock
);
if
(
pDataBlock
!=
NULL
&&
pDataBlock
->
info
.
rows
>
0
)
{
if
(
pDataBlock
!=
NULL
&&
pDataBlock
->
info
.
rows
>
0
)
{
...
...
source/libs/executor/inc/executorInt.h
浏览文件 @
a0c2b347
...
@@ -12,14 +12,75 @@
...
@@ -12,14 +12,75 @@
* You should have received a copy of the GNU Affero General Public License
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#ifndef TDENGINE_EXECUTORINT_H
#ifndef _TD_EXECUTOR_INT_H
#define TDENGINE_EXECUTORINT_H
#define _TD_EXECUTOR_INT_H
#ifdef __cplusplus
#ifdef __cplusplus
extern
"C"
{
extern
"C"
{
#endif
#endif
#include "os.h"
#include "tcommon.h"
#include "tlosertree.h"
#include "tsort.h"
#include "ttszip.h"
#include "tvariant.h"
#include "dataSinkMgt.h"
#include "executil.h"
#include "executor.h"
#include "planner.h"
#include "scalar.h"
#include "taosdef.h"
#include "tarray.h"
#include "tfill.h"
#include "thash.h"
#include "tlockfree.h"
#include "tmsg.h"
#include "tpagedbuf.h"
#include "tstream.h"
#include "tstreamUpdate.h"
#include "vnode.h"
typedef
int32_t
(
*
__block_search_fn_t
)(
char
*
data
,
int32_t
num
,
int64_t
key
,
int32_t
order
);
#define IS_VALID_SESSION_WIN(winInfo) ((winInfo).sessionWin.win.skey > 0)
#define SET_SESSION_WIN_INVALID(winInfo) ((winInfo).sessionWin.win.skey = INT64_MIN)
#define IS_INVALID_SESSION_WIN_KEY(winKey) ((winKey).win.skey <= 0)
#define SET_SESSION_WIN_KEY_INVALID(pWinKey) ((pWinKey)->win.skey = INT64_MIN)
/**
* If the number of generated results is greater than this value,
* query query will be halt and return results to client immediate.
*/
typedef
struct
SResultInfo
{
// TODO refactor
int64_t
totalRows
;
// total generated result size in rows
int64_t
totalBytes
;
// total results in bytes.
int32_t
capacity
;
// capacity of current result output buffer
int32_t
threshold
;
// result size threshold in rows.
}
SResultInfo
;
typedef
struct
STableQueryInfo
{
TSKEY
lastKey
;
// last check ts, todo remove it later
SResultRowPosition
pos
;
// current active time window
}
STableQueryInfo
;
typedef
struct
SLimit
{
int64_t
limit
;
int64_t
offset
;
}
SLimit
;
typedef
struct
STableScanAnalyzeInfo
SFileBlockLoadRecorder
;
enum
{
STREAM_RECOVER_STEP__NONE
=
0
,
STREAM_RECOVER_STEP__PREPARE1
,
STREAM_RECOVER_STEP__PREPARE2
,
STREAM_RECOVER_STEP__SCAN1
,
STREAM_RECOVER_STEP__SCAN2
,
};
extern
int32_t
exchangeObjRefPool
;
extern
int32_t
exchangeObjRefPool
;
typedef
struct
{
typedef
struct
{
...
@@ -29,9 +90,584 @@ typedef struct {
...
@@ -29,9 +90,584 @@ typedef struct {
int32_t
bytes
;
int32_t
bytes
;
}
SGroupKeys
,
SStateKeys
;
}
SGroupKeys
,
SStateKeys
;
typedef
struct
{
char
*
tablename
;
char
*
dbname
;
int32_t
tversion
;
SSchemaWrapper
*
sw
;
SSchemaWrapper
*
qsw
;
}
SSchemaInfo
;
typedef
struct
SExchangeOpStopInfo
{
int32_t
operatorType
;
int64_t
refId
;
}
SExchangeOpStopInfo
;
typedef
struct
SExprSupp
{
SExprInfo
*
pExprInfo
;
int32_t
numOfExprs
;
// the number of scalar expression in group operator
SqlFunctionCtx
*
pCtx
;
int32_t
*
rowEntryInfoOffset
;
// offset value for each row result cell info
SFilterInfo
*
pFilterInfo
;
}
SExprSupp
;
typedef
enum
{
EX_SOURCE_DATA_NOT_READY
=
0x1
,
EX_SOURCE_DATA_READY
=
0x2
,
EX_SOURCE_DATA_EXHAUSTED
=
0x3
,
}
EX_SOURCE_STATUS
;
#define COL_MATCH_FROM_COL_ID 0x1
#define COL_MATCH_FROM_SLOT_ID 0x2
typedef
struct
SLoadRemoteDataInfo
{
uint64_t
totalSize
;
// total load bytes from remote
uint64_t
totalRows
;
// total number of rows
uint64_t
totalElapsed
;
// total elapsed time
}
SLoadRemoteDataInfo
;
typedef
struct
SLimitInfo
{
SLimit
limit
;
SLimit
slimit
;
uint64_t
currentGroupId
;
int64_t
remainGroupOffset
;
int64_t
numOfOutputGroups
;
int64_t
remainOffset
;
int64_t
numOfOutputRows
;
}
SLimitInfo
;
typedef
struct
SExchangeInfo
{
SArray
*
pSources
;
SArray
*
pSourceDataInfo
;
tsem_t
ready
;
void
*
pTransporter
;
// SArray<SSDataBlock*>, result block list, used to keep the multi-block that
// passed by downstream operator
SArray
*
pResultBlockList
;
SArray
*
pRecycledBlocks
;
// build a pool for small data block to avoid to repeatly create and then destroy.
SSDataBlock
*
pDummyBlock
;
// dummy block, not keep data
bool
seqLoadData
;
// sequential load data or not, false by default
int32_t
current
;
SLoadRemoteDataInfo
loadInfo
;
uint64_t
self
;
SLimitInfo
limitInfo
;
int64_t
openedTs
;
// start exec time stamp, todo: move to SLoadRemoteDataInfo
}
SExchangeInfo
;
typedef
struct
SScanInfo
{
int32_t
numOfAsc
;
int32_t
numOfDesc
;
}
SScanInfo
;
typedef
struct
SSampleExecInfo
{
double
sampleRatio
;
// data block sample ratio, 1 by default
uint32_t
seed
;
// random seed value
}
SSampleExecInfo
;
enum
{
TABLE_SCAN__TABLE_ORDER
=
1
,
TABLE_SCAN__BLOCK_ORDER
=
2
,
};
typedef
struct
SAggSupporter
{
SSHashObj
*
pResultRowHashTable
;
// quick locate the window object for each result
char
*
keyBuf
;
// window key buffer
SDiskbasedBuf
*
pResultBuf
;
// query result buffer based on blocked-wised disk file
int32_t
resultRowSize
;
// the result buffer size for each result row, with the meta data size for each row
int32_t
currentPageId
;
// current write page id
}
SAggSupporter
;
typedef
struct
{
// if the upstream is an interval operator, the interval info is also kept here to get the time window to check if
// current data block needs to be loaded.
SInterval
interval
;
SAggSupporter
*
pAggSup
;
SExprSupp
*
pExprSup
;
// expr supporter of aggregate operator
}
SAggOptrPushDownInfo
;
typedef
struct
STableMetaCacheInfo
{
SLRUCache
*
pTableMetaEntryCache
;
// 100 by default
uint64_t
metaFetch
;
uint64_t
cacheHit
;
}
STableMetaCacheInfo
;
typedef
struct
STableScanBase
{
STsdbReader
*
dataReader
;
SFileBlockLoadRecorder
readRecorder
;
SQueryTableDataCond
cond
;
SAggOptrPushDownInfo
pdInfo
;
SColMatchInfo
matchInfo
;
SReadHandle
readHandle
;
SExprSupp
pseudoSup
;
STableMetaCacheInfo
metaCache
;
int32_t
scanFlag
;
// table scan flag to denote if it is a repeat/reverse/main scan
int32_t
dataBlockLoadFlag
;
SLimitInfo
limitInfo
;
// there are more than one table list exists in one task, if only one vnode exists.
STableListInfo
*
pTableListInfo
;
}
STableScanBase
;
typedef
struct
STableScanInfo
{
STableScanBase
base
;
SScanInfo
scanInfo
;
int32_t
scanTimes
;
SSDataBlock
*
pResBlock
;
SSampleExecInfo
sample
;
// sample execution info
int32_t
currentGroupId
;
int32_t
currentTable
;
int8_t
scanMode
;
int8_t
assignBlockUid
;
bool
hasGroupByTag
;
bool
countOnly
;
}
STableScanInfo
;
typedef
struct
STableMergeScanInfo
{
int32_t
tableStartIndex
;
int32_t
tableEndIndex
;
bool
hasGroupId
;
uint64_t
groupId
;
SArray
*
queryConds
;
// array of queryTableDataCond
STableScanBase
base
;
int32_t
bufPageSize
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
SArray
*
pSortInfo
;
SSortHandle
*
pSortHandle
;
SSDataBlock
*
pSortInputBlock
;
int64_t
startTs
;
// sort start time
SArray
*
sortSourceParams
;
SLimitInfo
limitInfo
;
int64_t
numOfRows
;
SScanInfo
scanInfo
;
int32_t
scanTimes
;
SSDataBlock
*
pResBlock
;
SSampleExecInfo
sample
;
// sample execution info
SSortExecInfo
sortExecInfo
;
}
STableMergeScanInfo
;
typedef
struct
STagScanInfo
{
SColumnInfo
*
pCols
;
SSDataBlock
*
pRes
;
SColMatchInfo
matchInfo
;
int32_t
curPos
;
SReadHandle
readHandle
;
STableListInfo
*
pTableListInfo
;
}
STagScanInfo
;
typedef
enum
EStreamScanMode
{
STREAM_SCAN_FROM_READERHANDLE
=
1
,
STREAM_SCAN_FROM_RES
,
STREAM_SCAN_FROM_UPDATERES
,
STREAM_SCAN_FROM_DELETE_DATA
,
STREAM_SCAN_FROM_DATAREADER_RETRIEVE
,
STREAM_SCAN_FROM_DATAREADER_RANGE
,
}
EStreamScanMode
;
enum
{
PROJECT_RETRIEVE_CONTINUE
=
0x1
,
PROJECT_RETRIEVE_DONE
=
0x2
,
};
typedef
struct
SStreamAggSupporter
{
int32_t
resultRowSize
;
// the result buffer size for each result row, with the meta data size for each row
SSDataBlock
*
pScanBlock
;
SStreamState
*
pState
;
int64_t
gap
;
// stream session window gap
SqlFunctionCtx
*
pDummyCtx
;
// for combine
SSHashObj
*
pResultRows
;
int32_t
stateKeySize
;
int16_t
stateKeyType
;
SDiskbasedBuf
*
pResultBuf
;
}
SStreamAggSupporter
;
typedef
struct
SWindowSupporter
{
SStreamAggSupporter
*
pStreamAggSup
;
int64_t
gap
;
uint16_t
parentType
;
SAggSupporter
*
pIntervalAggSup
;
}
SWindowSupporter
;
typedef
struct
SPartitionBySupporter
{
SArray
*
pGroupCols
;
// group by columns, SArray<SColumn>
SArray
*
pGroupColVals
;
// current group column values, SArray<SGroupKeys>
char
*
keyBuf
;
// group by keys for hash
bool
needCalc
;
// partition by column
}
SPartitionBySupporter
;
typedef
struct
SPartitionDataInfo
{
uint64_t
groupId
;
char
*
tbname
;
SArray
*
tags
;
SArray
*
rowIds
;
}
SPartitionDataInfo
;
typedef
struct
STimeWindowAggSupp
{
int8_t
calTrigger
;
int8_t
calTriggerSaved
;
int64_t
deleteMark
;
int64_t
deleteMarkSaved
;
int64_t
waterMark
;
TSKEY
maxTs
;
TSKEY
minTs
;
SColumnInfoData
timeWindowData
;
// query time window info for scalar function execution.
}
STimeWindowAggSupp
;
typedef
struct
SStreamScanInfo
{
SExprInfo
*
pPseudoExpr
;
int32_t
numOfPseudoExpr
;
SExprSupp
tbnameCalSup
;
SExprSupp
tagCalSup
;
int32_t
primaryTsIndex
;
// primary time stamp slot id
SReadHandle
readHandle
;
SInterval
interval
;
// if the upstream is an interval operator, the interval info is also kept here.
SColMatchInfo
matchInfo
;
SArray
*
pBlockLists
;
// multiple SSDatablock.
SSDataBlock
*
pRes
;
// result SSDataBlock
SSDataBlock
*
pUpdateRes
;
// update SSDataBlock
int32_t
updateResIndex
;
int32_t
blockType
;
// current block type
int32_t
validBlockIndex
;
// Is current data has returned?
uint64_t
numOfExec
;
// execution times
STqReader
*
tqReader
;
uint64_t
groupId
;
SUpdateInfo
*
pUpdateInfo
;
EStreamScanMode
scanMode
;
struct
SOperatorInfo
*
pStreamScanOp
;
struct
SOperatorInfo
*
pTableScanOp
;
SArray
*
childIds
;
SWindowSupporter
windowSup
;
SPartitionBySupporter
partitionSup
;
SExprSupp
*
pPartScalarSup
;
bool
assignBlockUid
;
// assign block uid to groupId, temporarily used for generating rollup SMA.
int32_t
scanWinIndex
;
// for state operator
int32_t
pullDataResIndex
;
SSDataBlock
*
pPullDataRes
;
// pull data SSDataBlock
SSDataBlock
*
pDeleteDataRes
;
// delete data SSDataBlock
int32_t
deleteDataIndex
;
STimeWindow
updateWin
;
STimeWindowAggSupp
twAggSup
;
SSDataBlock
*
pUpdateDataRes
;
// status for tmq
SNodeList
*
pGroupTags
;
SNode
*
pTagCond
;
SNode
*
pTagIndexCond
;
// recover
int32_t
blockRecoverContiCnt
;
int32_t
blockRecoverTotCnt
;
SSDataBlock
*
pRecoverRes
;
SSDataBlock
*
pCreateTbRes
;
int8_t
igCheckUpdate
;
int8_t
igExpired
;
}
SStreamScanInfo
;
typedef
struct
{
SVnode
*
vnode
;
SSDataBlock
pRes
;
// result SSDataBlock
STsdbReader
*
dataReader
;
SSnapContext
*
sContext
;
STableListInfo
*
pTableListInfo
;
}
SStreamRawScanInfo
;
typedef
struct
STableCountScanSupp
{
int16_t
dbNameSlotId
;
int16_t
stbNameSlotId
;
int16_t
tbCountSlotId
;
bool
groupByDbName
;
bool
groupByStbName
;
char
dbNameFilter
[
TSDB_DB_NAME_LEN
];
char
stbNameFilter
[
TSDB_TABLE_NAME_LEN
];
}
STableCountScanSupp
;
typedef
struct
SOptrBasicInfo
{
SResultRowInfo
resultRowInfo
;
SSDataBlock
*
pRes
;
bool
mergeResultBlock
;
}
SOptrBasicInfo
;
typedef
struct
SIntervalAggOperatorInfo
{
SOptrBasicInfo
binfo
;
// basic info
SAggSupporter
aggSup
;
// aggregate supporter
SExprSupp
scalarSupp
;
// supporter for perform scalar function
SGroupResInfo
groupResInfo
;
// multiple results build supporter
SInterval
interval
;
// interval info
int32_t
primaryTsIndex
;
// primary time stamp slot id from result of downstream operator.
STimeWindow
win
;
// query time range
bool
timeWindowInterpo
;
// interpolation needed or not
SArray
*
pInterpCols
;
// interpolation columns
int32_t
resultTsOrder
;
// result timestamp order
int32_t
inputOrder
;
// input data ts order
EOPTR_EXEC_MODEL
execModel
;
// operator execution model [batch model|stream model]
STimeWindowAggSupp
twAggSup
;
SArray
*
pPrevValues
;
// SArray<SGroupKeys> used to keep the previous not null value for interpolation.
}
SIntervalAggOperatorInfo
;
typedef
struct
SMergeAlignedIntervalAggOperatorInfo
{
SIntervalAggOperatorInfo
*
intervalAggOperatorInfo
;
uint64_t
groupId
;
// current groupId
int64_t
curTs
;
// current ts
SSDataBlock
*
prefetchedBlock
;
SResultRow
*
pResultRow
;
}
SMergeAlignedIntervalAggOperatorInfo
;
typedef
struct
SStreamIntervalOperatorInfo
{
SOptrBasicInfo
binfo
;
// basic info
SAggSupporter
aggSup
;
// aggregate supporter
SExprSupp
scalarSupp
;
// supporter for perform scalar function
SGroupResInfo
groupResInfo
;
// multiple results build supporter
SInterval
interval
;
// interval info
int32_t
primaryTsIndex
;
// primary time stamp slot id from result of downstream operator.
STimeWindowAggSupp
twAggSup
;
bool
invertible
;
bool
ignoreExpiredData
;
bool
ignoreExpiredDataSaved
;
SArray
*
pDelWins
;
// SWinRes
int32_t
delIndex
;
SSDataBlock
*
pDelRes
;
SPhysiNode
*
pPhyNode
;
// create new child
SHashObj
*
pPullDataMap
;
SArray
*
pPullWins
;
// SPullWindowInfo
int32_t
pullIndex
;
SSDataBlock
*
pPullDataRes
;
bool
isFinal
;
SArray
*
pChildren
;
SStreamState
*
pState
;
SWinKey
delKey
;
uint64_t
numOfDatapack
;
SArray
*
pUpdated
;
SSHashObj
*
pUpdatedMap
;
int64_t
dataVersion
;
}
SStreamIntervalOperatorInfo
;
typedef
struct
SDataGroupInfo
{
uint64_t
groupId
;
int64_t
numOfRows
;
SArray
*
pPageList
;
}
SDataGroupInfo
;
typedef
struct
SWindowRowsSup
{
STimeWindow
win
;
TSKEY
prevTs
;
int32_t
startRowIndex
;
int32_t
numOfRows
;
uint64_t
groupId
;
}
SWindowRowsSup
;
typedef
struct
SResultWindowInfo
{
void
*
pOutputBuf
;
SSessionKey
sessionWin
;
bool
isOutput
;
}
SResultWindowInfo
;
typedef
struct
SStreamSessionAggOperatorInfo
{
SOptrBasicInfo
binfo
;
SStreamAggSupporter
streamAggSup
;
SExprSupp
scalarSupp
;
// supporter for perform scalar function
SGroupResInfo
groupResInfo
;
int32_t
primaryTsIndex
;
// primary timestamp slot id
int32_t
endTsIndex
;
// window end timestamp slot id
int32_t
order
;
// current SSDataBlock scan order
STimeWindowAggSupp
twAggSup
;
SSDataBlock
*
pWinBlock
;
// window result
SSDataBlock
*
pDelRes
;
// delete result
SSDataBlock
*
pUpdateRes
;
// update window
bool
returnUpdate
;
SSHashObj
*
pStDeleted
;
void
*
pDelIterator
;
SArray
*
pChildren
;
// cache for children's result; final stream operator
SPhysiNode
*
pPhyNode
;
// create new child
bool
isFinal
;
bool
ignoreExpiredData
;
bool
ignoreExpiredDataSaved
;
SArray
*
pUpdated
;
SSHashObj
*
pStUpdated
;
int64_t
dataVersion
;
}
SStreamSessionAggOperatorInfo
;
typedef
struct
SStreamStateAggOperatorInfo
{
SOptrBasicInfo
binfo
;
SStreamAggSupporter
streamAggSup
;
SExprSupp
scalarSupp
;
// supporter for perform scalar function
SGroupResInfo
groupResInfo
;
int32_t
primaryTsIndex
;
// primary timestamp slot id
STimeWindowAggSupp
twAggSup
;
SColumn
stateCol
;
SSDataBlock
*
pDelRes
;
SSHashObj
*
pSeDeleted
;
void
*
pDelIterator
;
SArray
*
pChildren
;
// cache for children's result;
bool
ignoreExpiredData
;
bool
ignoreExpiredDataSaved
;
SArray
*
pUpdated
;
SSHashObj
*
pSeUpdated
;
int64_t
dataVersion
;
}
SStreamStateAggOperatorInfo
;
typedef
struct
SStreamPartitionOperatorInfo
{
SOptrBasicInfo
binfo
;
SPartitionBySupporter
partitionSup
;
SExprSupp
scalarSup
;
SExprSupp
tbnameCalSup
;
SExprSupp
tagCalSup
;
SHashObj
*
pPartitions
;
void
*
parIte
;
void
*
pTbNameIte
;
SSDataBlock
*
pInputDataBlock
;
int32_t
tsColIndex
;
SSDataBlock
*
pDelRes
;
SSDataBlock
*
pCreateTbRes
;
}
SStreamPartitionOperatorInfo
;
typedef
struct
SStreamFillSupporter
{
int32_t
type
;
// fill type
SInterval
interval
;
SResultRowData
prev
;
SResultRowData
cur
;
SResultRowData
next
;
SResultRowData
nextNext
;
SFillColInfo
*
pAllColInfo
;
// fill exprs and not fill exprs
SExprSupp
notFillExprSup
;
int32_t
numOfAllCols
;
// number of all exprs, including the tags columns
int32_t
numOfFillCols
;
int32_t
numOfNotFillCols
;
int32_t
rowSize
;
SSHashObj
*
pResMap
;
bool
hasDelete
;
}
SStreamFillSupporter
;
typedef
struct
SStreamFillOperatorInfo
{
SStreamFillSupporter
*
pFillSup
;
SSDataBlock
*
pRes
;
SSDataBlock
*
pSrcBlock
;
int32_t
srcRowIndex
;
SSDataBlock
*
pSrcDelBlock
;
int32_t
srcDelRowIndex
;
SSDataBlock
*
pDelRes
;
SColMatchInfo
matchInfo
;
int32_t
primaryTsCol
;
int32_t
primarySrcSlotId
;
SStreamFillInfo
*
pFillInfo
;
}
SStreamFillOperatorInfo
;
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
SSchemaWrapper
*
extractQueriedColumnSchema
(
SScanPhysiNode
*
pScanNode
);
int32_t
initQueriedTableSchemaInfo
(
SReadHandle
*
pHandle
,
SScanPhysiNode
*
pScanNode
,
const
char
*
dbName
,
SExecTaskInfo
*
pTaskInfo
);
void
cleanupQueriedTableScanInfo
(
SSchemaInfo
*
pSchemaInfo
);
void
initBasicInfo
(
SOptrBasicInfo
*
pInfo
,
SSDataBlock
*
pBlock
);
void
cleanupBasicInfo
(
SOptrBasicInfo
*
pInfo
);
int32_t
initExprSupp
(
SExprSupp
*
pSup
,
SExprInfo
*
pExprInfo
,
int32_t
numOfExpr
);
void
cleanupExprSupp
(
SExprSupp
*
pSup
);
void
destroyExprInfo
(
SExprInfo
*
pExpr
,
int32_t
numOfExprs
);
int32_t
initAggSup
(
SExprSupp
*
pSup
,
SAggSupporter
*
pAggSup
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
size_t
keyBufSize
,
const
char
*
pkey
,
void
*
pState
);
void
cleanupAggSup
(
SAggSupporter
*
pAggSup
);
void
initResultSizeInfo
(
SResultInfo
*
pResultInfo
,
int32_t
numOfRows
);
void
doBuildStreamResBlock
(
struct
SOperatorInfo
*
pOperator
,
SOptrBasicInfo
*
pbInfo
,
SGroupResInfo
*
pGroupResInfo
,
SDiskbasedBuf
*
pBuf
);
void
doBuildResultDatablock
(
struct
SOperatorInfo
*
pOperator
,
SOptrBasicInfo
*
pbInfo
,
SGroupResInfo
*
pGroupResInfo
,
SDiskbasedBuf
*
pBuf
);
bool
hasLimitOffsetInfo
(
SLimitInfo
*
pLimitInfo
);
bool
hasSlimitOffsetInfo
(
SLimitInfo
*
pLimitInfo
);
void
initLimitInfo
(
const
SNode
*
pLimit
,
const
SNode
*
pSLimit
,
SLimitInfo
*
pLimitInfo
);
void
resetLimitInfoForNextGroup
(
SLimitInfo
*
pLimitInfo
);
bool
applyLimitOffset
(
SLimitInfo
*
pLimitInfo
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
);
void
applyAggFunctionOnPartialTuples
(
SExecTaskInfo
*
taskInfo
,
SqlFunctionCtx
*
pCtx
,
SColumnInfoData
*
pTimeWindowData
,
int32_t
offset
,
int32_t
forwardStep
,
int32_t
numOfTotal
,
int32_t
numOfOutput
);
int32_t
extractDataBlockFromFetchRsp
(
SSDataBlock
*
pRes
,
char
*
pData
,
SArray
*
pColList
,
char
**
pNextStart
);
void
updateLoadRemoteInfo
(
SLoadRemoteDataInfo
*
pInfo
,
int64_t
numOfRows
,
int32_t
dataLen
,
int64_t
startTs
,
struct
SOperatorInfo
*
pOperator
);
STimeWindow
getFirstQualifiedTimeWindow
(
int64_t
ts
,
STimeWindow
*
pWindow
,
SInterval
*
pInterval
,
int32_t
order
);
int32_t
getBufferPgSize
(
int32_t
rowSize
,
uint32_t
*
defaultPgsz
,
uint32_t
*
defaultBufsz
);
extern
void
doDestroyExchangeOperatorInfo
(
void
*
param
);
void
doFilter
(
SSDataBlock
*
pBlock
,
SFilterInfo
*
pFilterInfo
,
SColMatchInfo
*
pColMatchInfo
);
int32_t
addTagPseudoColumnData
(
SReadHandle
*
pHandle
,
const
SExprInfo
*
pExpr
,
int32_t
numOfExpr
,
SSDataBlock
*
pBlock
,
int32_t
rows
,
const
char
*
idStr
,
STableMetaCacheInfo
*
pCache
);
void
appendOneRowToDataBlock
(
SSDataBlock
*
pBlock
,
STupleHandle
*
pTupleHandle
);
void
setTbNameColData
(
const
SSDataBlock
*
pBlock
,
SColumnInfoData
*
pColInfoData
,
int32_t
functionId
,
const
char
*
name
);
void
setResultRowInitCtx
(
SResultRow
*
pResult
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowEntryInfoOffset
);
void
clearResultRowInitFlag
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
);
SResultRow
*
doSetResultOutBufByKey
(
SDiskbasedBuf
*
pResultBuf
,
SResultRowInfo
*
pResultRowInfo
,
char
*
pData
,
int16_t
bytes
,
bool
masterscan
,
uint64_t
groupId
,
SExecTaskInfo
*
pTaskInfo
,
bool
isIntervalQuery
,
SAggSupporter
*
pSup
,
bool
keepGroup
);
int32_t
projectApplyFunctions
(
SExprInfo
*
pExpr
,
SSDataBlock
*
pResult
,
SSDataBlock
*
pSrcBlock
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
SArray
*
pPseudoList
);
void
setInputDataBlock
(
SExprSupp
*
pExprSupp
,
SSDataBlock
*
pBlock
,
int32_t
order
,
int32_t
scanFlag
,
bool
createDummyCol
);
int32_t
checkForQueryBuf
(
size_t
numOfTables
);
int32_t
createDataSinkParam
(
SDataSinkNode
*
pNode
,
void
**
pParam
,
SExecTaskInfo
*
pTask
,
SReadHandle
*
readHandle
);
STimeWindow
getActiveTimeWindow
(
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int64_t
ts
,
SInterval
*
pInterval
,
int32_t
order
);
int32_t
getNumOfRowsInTimeWindow
(
SDataBlockInfo
*
pDataBlockInfo
,
TSKEY
*
pPrimaryColumn
,
int32_t
startPos
,
TSKEY
ekey
,
__block_search_fn_t
searchFn
,
STableQueryInfo
*
item
,
int32_t
order
);
int32_t
binarySearchForKey
(
char
*
pValue
,
int
num
,
TSKEY
key
,
int
order
);
SResultRow
*
getNewResultRow
(
SDiskbasedBuf
*
pResultBuf
,
int32_t
*
currentPageId
,
int32_t
interBufSize
);
void
getCurSessionWindow
(
SStreamAggSupporter
*
pAggSup
,
TSKEY
startTs
,
TSKEY
endTs
,
uint64_t
groupId
,
SSessionKey
*
pKey
);
bool
isInTimeWindow
(
STimeWindow
*
pWin
,
TSKEY
ts
,
int64_t
gap
);
bool
functionNeedToExecute
(
SqlFunctionCtx
*
pCtx
);
bool
isOverdue
(
TSKEY
ts
,
STimeWindowAggSupp
*
pSup
);
bool
isCloseWindow
(
STimeWindow
*
pWin
,
STimeWindowAggSupp
*
pSup
);
bool
isDeletedStreamWindow
(
STimeWindow
*
pWin
,
uint64_t
groupId
,
SStreamState
*
pState
,
STimeWindowAggSupp
*
pTwSup
);
void
appendOneRowToStreamSpecialBlock
(
SSDataBlock
*
pBlock
,
TSKEY
*
pStartTs
,
TSKEY
*
pEndTs
,
uint64_t
*
pUid
,
uint64_t
*
pGp
,
void
*
pTbName
);
uint64_t
calGroupIdByData
(
SPartitionBySupporter
*
pParSup
,
SExprSupp
*
pExprSup
,
SSDataBlock
*
pBlock
,
int32_t
rowId
);
int32_t
finalizeResultRows
(
SDiskbasedBuf
*
pBuf
,
SResultRowPosition
*
resultRowPosition
,
SExprSupp
*
pSup
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
);
bool
groupbyTbname
(
SNodeList
*
pGroupList
);
int32_t
buildDataBlockFromGroupRes
(
struct
SOperatorInfo
*
pOperator
,
SStreamState
*
pState
,
SSDataBlock
*
pBlock
,
SExprSupp
*
pSup
,
SGroupResInfo
*
pGroupResInfo
);
int32_t
saveSessionDiscBuf
(
SStreamState
*
pState
,
SSessionKey
*
key
,
void
*
buf
,
int32_t
size
);
int32_t
buildSessionResultDataBlock
(
struct
SOperatorInfo
*
pOperator
,
SStreamState
*
pState
,
SSDataBlock
*
pBlock
,
SExprSupp
*
pSup
,
SGroupResInfo
*
pGroupResInfo
);
int32_t
setOutputBuf
(
SStreamState
*
pState
,
STimeWindow
*
win
,
SResultRow
**
pResult
,
int64_t
tableGroupId
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowEntryInfoOffset
,
SAggSupporter
*
pAggSup
);
int32_t
releaseOutputBuf
(
SStreamState
*
pState
,
SWinKey
*
pKey
,
SResultRow
*
pResult
);
int32_t
saveOutputBuf
(
SStreamState
*
pState
,
SWinKey
*
pKey
,
SResultRow
*
pResult
,
int32_t
resSize
);
void
getNextIntervalWindow
(
SInterval
*
pInterval
,
STimeWindow
*
tw
,
int32_t
order
);
int32_t
getForwardStepsInBlock
(
int32_t
numOfRows
,
__block_search_fn_t
searchFn
,
TSKEY
ekey
,
int32_t
pos
,
int32_t
order
,
int64_t
*
pData
);
void
appendCreateTableRow
(
SStreamState
*
pState
,
SExprSupp
*
pTableSup
,
SExprSupp
*
pTagSup
,
uint64_t
groupId
,
SSDataBlock
*
pSrcBlock
,
int32_t
rowId
,
SSDataBlock
*
pDestBlock
);
SSDataBlock
*
buildCreateTableBlock
(
SExprSupp
*
tbName
,
SExprSupp
*
tag
);
SExprInfo
*
createExpr
(
SNodeList
*
pNodeList
,
int32_t
*
numOfExprs
);
void
copyResultrowToDataBlock
(
SExprInfo
*
pExprInfo
,
int32_t
numOfExprs
,
SResultRow
*
pRow
,
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
const
int32_t
*
rowEntryOffset
,
SExecTaskInfo
*
pTaskInfo
);
void
doUpdateNumOfRows
(
SqlFunctionCtx
*
pCtx
,
SResultRow
*
pRow
,
int32_t
numOfExprs
,
const
int32_t
*
rowEntryOffset
);
void
doClearBufferedBlocks
(
SStreamScanInfo
*
pInfo
);
uint64_t
calcGroupId
(
char
*
pData
,
int32_t
len
);
uint64_t
calcGroupId
(
char
*
pData
,
int32_t
len
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
#endif
/*_TD_EXECUTOR_INT_H*/
#endif // TDENGINE_EXECUTORINT_H
\ No newline at end of file
source/libs/executor/inc/executorimpl.h
已删除
100644 → 0
浏览文件 @
f850388e
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_EXECUTORIMPL_H
#define TDENGINE_EXECUTORIMPL_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include "os.h"
#include "tcommon.h"
#include "tlosertree.h"
#include "tsort.h"
#include "ttszip.h"
#include "tvariant.h"
#include "dataSinkMgt.h"
#include "executil.h"
#include "executor.h"
#include "planner.h"
#include "scalar.h"
#include "taosdef.h"
#include "tarray.h"
#include "tfill.h"
#include "thash.h"
#include "tlockfree.h"
#include "tmsg.h"
#include "tpagedbuf.h"
#include "tstream.h"
#include "tstreamUpdate.h"
#include "executorInt.h"
#include "vnode.h"
typedef
int32_t
(
*
__block_search_fn_t
)(
char
*
data
,
int32_t
num
,
int64_t
key
,
int32_t
order
);
#define IS_VALID_SESSION_WIN(winInfo) ((winInfo).sessionWin.win.skey > 0)
#define SET_SESSION_WIN_INVALID(winInfo) ((winInfo).sessionWin.win.skey = INT64_MIN)
#define IS_INVALID_SESSION_WIN_KEY(winKey) ((winKey).win.skey <= 0)
#define SET_SESSION_WIN_KEY_INVALID(pWinKey) ((pWinKey)->win.skey = INT64_MIN)
enum
{
// when this task starts to execute, this status will set
TASK_NOT_COMPLETED
=
0x1u
,
/* Task is over
* 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc.
* 2. when all data within queried time window, it is also denoted as query_completed
*/
TASK_COMPLETED
=
0x2u
,
};
/**
* If the number of generated results is greater than this value,
* query query will be halt and return results to client immediate.
*/
typedef
struct
SResultInfo
{
// TODO refactor
int64_t
totalRows
;
// total generated result size in rows
int64_t
totalBytes
;
// total results in bytes.
int32_t
capacity
;
// capacity of current result output buffer
int32_t
threshold
;
// result size threshold in rows.
}
SResultInfo
;
typedef
struct
STableQueryInfo
{
TSKEY
lastKey
;
// last check ts, todo remove it later
SResultRowPosition
pos
;
// current active time window
}
STableQueryInfo
;
typedef
struct
SLimit
{
int64_t
limit
;
int64_t
offset
;
}
SLimit
;
typedef
struct
STableScanAnalyzeInfo
SFileBlockLoadRecorder
;
enum
{
STREAM_RECOVER_STEP__NONE
=
0
,
STREAM_RECOVER_STEP__PREPARE1
,
STREAM_RECOVER_STEP__PREPARE2
,
STREAM_RECOVER_STEP__SCAN1
,
STREAM_RECOVER_STEP__SCAN2
,
};
typedef
struct
{
STqOffsetVal
currentOffset
;
// for tmq
SMqMetaRsp
metaRsp
;
// for tmq fetching meta
int64_t
snapshotVer
;
SPackedData
submit
;
SSchemaWrapper
*
schema
;
char
tbName
[
TSDB_TABLE_NAME_LEN
];
int8_t
recoverStep
;
int8_t
recoverScanFinished
;
SQueryTableDataCond
tableCond
;
int64_t
fillHistoryVer1
;
int64_t
fillHistoryVer2
;
SStreamState
*
pState
;
int64_t
dataVersion
;
int64_t
checkPointId
;
}
SStreamTaskInfo
;
typedef
struct
{
char
*
tablename
;
char
*
dbname
;
int32_t
tversion
;
SSchemaWrapper
*
sw
;
SSchemaWrapper
*
qsw
;
}
SSchemaInfo
;
typedef
struct
SExchangeOpStopInfo
{
int32_t
operatorType
;
int64_t
refId
;
}
SExchangeOpStopInfo
;
typedef
struct
SExprSupp
{
SExprInfo
*
pExprInfo
;
int32_t
numOfExprs
;
// the number of scalar expression in group operator
SqlFunctionCtx
*
pCtx
;
int32_t
*
rowEntryInfoOffset
;
// offset value for each row result cell info
SFilterInfo
*
pFilterInfo
;
}
SExprSupp
;
typedef
enum
{
EX_SOURCE_DATA_NOT_READY
=
0x1
,
EX_SOURCE_DATA_READY
=
0x2
,
EX_SOURCE_DATA_EXHAUSTED
=
0x3
,
}
EX_SOURCE_STATUS
;
#define COL_MATCH_FROM_COL_ID 0x1
#define COL_MATCH_FROM_SLOT_ID 0x2
typedef
struct
SLoadRemoteDataInfo
{
uint64_t
totalSize
;
// total load bytes from remote
uint64_t
totalRows
;
// total number of rows
uint64_t
totalElapsed
;
// total elapsed time
}
SLoadRemoteDataInfo
;
typedef
struct
SLimitInfo
{
SLimit
limit
;
SLimit
slimit
;
uint64_t
currentGroupId
;
int64_t
remainGroupOffset
;
int64_t
numOfOutputGroups
;
int64_t
remainOffset
;
int64_t
numOfOutputRows
;
}
SLimitInfo
;
typedef
struct
SExchangeInfo
{
SArray
*
pSources
;
SArray
*
pSourceDataInfo
;
tsem_t
ready
;
void
*
pTransporter
;
// SArray<SSDataBlock*>, result block list, used to keep the multi-block that
// passed by downstream operator
SArray
*
pResultBlockList
;
SArray
*
pRecycledBlocks
;
// build a pool for small data block to avoid to repeatly create and then destroy.
SSDataBlock
*
pDummyBlock
;
// dummy block, not keep data
bool
seqLoadData
;
// sequential load data or not, false by default
int32_t
current
;
SLoadRemoteDataInfo
loadInfo
;
uint64_t
self
;
SLimitInfo
limitInfo
;
int64_t
openedTs
;
// start exec time stamp, todo: move to SLoadRemoteDataInfo
}
SExchangeInfo
;
typedef
struct
SScanInfo
{
int32_t
numOfAsc
;
int32_t
numOfDesc
;
}
SScanInfo
;
typedef
struct
SSampleExecInfo
{
double
sampleRatio
;
// data block sample ratio, 1 by default
uint32_t
seed
;
// random seed value
}
SSampleExecInfo
;
enum
{
TABLE_SCAN__TABLE_ORDER
=
1
,
TABLE_SCAN__BLOCK_ORDER
=
2
,
};
typedef
struct
SAggSupporter
{
SSHashObj
*
pResultRowHashTable
;
// quick locate the window object for each result
char
*
keyBuf
;
// window key buffer
SDiskbasedBuf
*
pResultBuf
;
// query result buffer based on blocked-wised disk file
int32_t
resultRowSize
;
// the result buffer size for each result row, with the meta data size for each row
int32_t
currentPageId
;
// current write page id
}
SAggSupporter
;
typedef
struct
{
// if the upstream is an interval operator, the interval info is also kept here to get the time window to check if
// current data block needs to be loaded.
SInterval
interval
;
SAggSupporter
*
pAggSup
;
SExprSupp
*
pExprSup
;
// expr supporter of aggregate operator
}
SAggOptrPushDownInfo
;
typedef
struct
STableMetaCacheInfo
{
SLRUCache
*
pTableMetaEntryCache
;
// 100 by default
uint64_t
metaFetch
;
uint64_t
cacheHit
;
}
STableMetaCacheInfo
;
typedef
struct
STableScanBase
{
STsdbReader
*
dataReader
;
SFileBlockLoadRecorder
readRecorder
;
SQueryTableDataCond
cond
;
SAggOptrPushDownInfo
pdInfo
;
SColMatchInfo
matchInfo
;
SReadHandle
readHandle
;
SExprSupp
pseudoSup
;
STableMetaCacheInfo
metaCache
;
int32_t
scanFlag
;
// table scan flag to denote if it is a repeat/reverse/main scan
int32_t
dataBlockLoadFlag
;
SLimitInfo
limitInfo
;
// there are more than one table list exists in one task, if only one vnode exists.
STableListInfo
*
pTableListInfo
;
}
STableScanBase
;
typedef
struct
STableScanInfo
{
STableScanBase
base
;
SScanInfo
scanInfo
;
int32_t
scanTimes
;
SSDataBlock
*
pResBlock
;
SSampleExecInfo
sample
;
// sample execution info
int32_t
currentGroupId
;
int32_t
currentTable
;
int8_t
scanMode
;
int8_t
assignBlockUid
;
bool
hasGroupByTag
;
bool
countOnly
;
}
STableScanInfo
;
typedef
struct
STableMergeScanInfo
{
int32_t
tableStartIndex
;
int32_t
tableEndIndex
;
bool
hasGroupId
;
uint64_t
groupId
;
SArray
*
queryConds
;
// array of queryTableDataCond
STableScanBase
base
;
int32_t
bufPageSize
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
SArray
*
pSortInfo
;
SSortHandle
*
pSortHandle
;
SSDataBlock
*
pSortInputBlock
;
int64_t
startTs
;
// sort start time
SArray
*
sortSourceParams
;
SLimitInfo
limitInfo
;
int64_t
numOfRows
;
SScanInfo
scanInfo
;
int32_t
scanTimes
;
SSDataBlock
*
pResBlock
;
SSampleExecInfo
sample
;
// sample execution info
SSortExecInfo
sortExecInfo
;
}
STableMergeScanInfo
;
typedef
struct
STagScanInfo
{
SColumnInfo
*
pCols
;
SSDataBlock
*
pRes
;
SColMatchInfo
matchInfo
;
int32_t
curPos
;
SReadHandle
readHandle
;
STableListInfo
*
pTableListInfo
;
}
STagScanInfo
;
typedef
enum
EStreamScanMode
{
STREAM_SCAN_FROM_READERHANDLE
=
1
,
STREAM_SCAN_FROM_RES
,
STREAM_SCAN_FROM_UPDATERES
,
STREAM_SCAN_FROM_DELETE_DATA
,
STREAM_SCAN_FROM_DATAREADER_RETRIEVE
,
STREAM_SCAN_FROM_DATAREADER_RANGE
,
}
EStreamScanMode
;
enum
{
PROJECT_RETRIEVE_CONTINUE
=
0x1
,
PROJECT_RETRIEVE_DONE
=
0x2
,
};
typedef
struct
SStreamAggSupporter
{
int32_t
resultRowSize
;
// the result buffer size for each result row, with the meta data size for each row
SSDataBlock
*
pScanBlock
;
SStreamState
*
pState
;
int64_t
gap
;
// stream session window gap
SqlFunctionCtx
*
pDummyCtx
;
// for combine
SSHashObj
*
pResultRows
;
int32_t
stateKeySize
;
int16_t
stateKeyType
;
SDiskbasedBuf
*
pResultBuf
;
}
SStreamAggSupporter
;
typedef
struct
SWindowSupporter
{
SStreamAggSupporter
*
pStreamAggSup
;
int64_t
gap
;
uint16_t
parentType
;
SAggSupporter
*
pIntervalAggSup
;
}
SWindowSupporter
;
typedef
struct
SPartitionBySupporter
{
SArray
*
pGroupCols
;
// group by columns, SArray<SColumn>
SArray
*
pGroupColVals
;
// current group column values, SArray<SGroupKeys>
char
*
keyBuf
;
// group by keys for hash
bool
needCalc
;
// partition by column
}
SPartitionBySupporter
;
typedef
struct
SPartitionDataInfo
{
uint64_t
groupId
;
char
*
tbname
;
SArray
*
tags
;
SArray
*
rowIds
;
}
SPartitionDataInfo
;
typedef
struct
STimeWindowAggSupp
{
int8_t
calTrigger
;
int8_t
calTriggerSaved
;
int64_t
deleteMark
;
int64_t
deleteMarkSaved
;
int64_t
waterMark
;
TSKEY
maxTs
;
TSKEY
minTs
;
SColumnInfoData
timeWindowData
;
// query time window info for scalar function execution.
}
STimeWindowAggSupp
;
typedef
struct
SStreamScanInfo
{
SExprInfo
*
pPseudoExpr
;
int32_t
numOfPseudoExpr
;
SExprSupp
tbnameCalSup
;
SExprSupp
tagCalSup
;
int32_t
primaryTsIndex
;
// primary time stamp slot id
SReadHandle
readHandle
;
SInterval
interval
;
// if the upstream is an interval operator, the interval info is also kept here.
SColMatchInfo
matchInfo
;
SArray
*
pBlockLists
;
// multiple SSDatablock.
SSDataBlock
*
pRes
;
// result SSDataBlock
SSDataBlock
*
pUpdateRes
;
// update SSDataBlock
int32_t
updateResIndex
;
int32_t
blockType
;
// current block type
int32_t
validBlockIndex
;
// Is current data has returned?
uint64_t
numOfExec
;
// execution times
STqReader
*
tqReader
;
uint64_t
groupId
;
SUpdateInfo
*
pUpdateInfo
;
EStreamScanMode
scanMode
;
struct
SOperatorInfo
*
pStreamScanOp
;
struct
SOperatorInfo
*
pTableScanOp
;
SArray
*
childIds
;
SWindowSupporter
windowSup
;
SPartitionBySupporter
partitionSup
;
SExprSupp
*
pPartScalarSup
;
bool
assignBlockUid
;
// assign block uid to groupId, temporarily used for generating rollup SMA.
int32_t
scanWinIndex
;
// for state operator
int32_t
pullDataResIndex
;
SSDataBlock
*
pPullDataRes
;
// pull data SSDataBlock
SSDataBlock
*
pDeleteDataRes
;
// delete data SSDataBlock
int32_t
deleteDataIndex
;
STimeWindow
updateWin
;
STimeWindowAggSupp
twAggSup
;
SSDataBlock
*
pUpdateDataRes
;
// status for tmq
SNodeList
*
pGroupTags
;
SNode
*
pTagCond
;
SNode
*
pTagIndexCond
;
// recover
int32_t
blockRecoverContiCnt
;
int32_t
blockRecoverTotCnt
;
SSDataBlock
*
pRecoverRes
;
SSDataBlock
*
pCreateTbRes
;
int8_t
igCheckUpdate
;
int8_t
igExpired
;
}
SStreamScanInfo
;
typedef
struct
{
SVnode
*
vnode
;
SSDataBlock
pRes
;
// result SSDataBlock
STsdbReader
*
dataReader
;
SSnapContext
*
sContext
;
STableListInfo
*
pTableListInfo
;
}
SStreamRawScanInfo
;
typedef
struct
STableCountScanSupp
{
int16_t
dbNameSlotId
;
int16_t
stbNameSlotId
;
int16_t
tbCountSlotId
;
bool
groupByDbName
;
bool
groupByStbName
;
char
dbNameFilter
[
TSDB_DB_NAME_LEN
];
char
stbNameFilter
[
TSDB_TABLE_NAME_LEN
];
}
STableCountScanSupp
;
typedef
struct
SOptrBasicInfo
{
SResultRowInfo
resultRowInfo
;
SSDataBlock
*
pRes
;
bool
mergeResultBlock
;
}
SOptrBasicInfo
;
typedef
struct
SIntervalAggOperatorInfo
{
SOptrBasicInfo
binfo
;
// basic info
SAggSupporter
aggSup
;
// aggregate supporter
SExprSupp
scalarSupp
;
// supporter for perform scalar function
SGroupResInfo
groupResInfo
;
// multiple results build supporter
SInterval
interval
;
// interval info
int32_t
primaryTsIndex
;
// primary time stamp slot id from result of downstream operator.
STimeWindow
win
;
// query time range
bool
timeWindowInterpo
;
// interpolation needed or not
SArray
*
pInterpCols
;
// interpolation columns
int32_t
resultTsOrder
;
// result timestamp order
int32_t
inputOrder
;
// input data ts order
EOPTR_EXEC_MODEL
execModel
;
// operator execution model [batch model|stream model]
STimeWindowAggSupp
twAggSup
;
SArray
*
pPrevValues
;
// SArray<SGroupKeys> used to keep the previous not null value for interpolation.
}
SIntervalAggOperatorInfo
;
typedef
struct
SMergeAlignedIntervalAggOperatorInfo
{
SIntervalAggOperatorInfo
*
intervalAggOperatorInfo
;
uint64_t
groupId
;
// current groupId
int64_t
curTs
;
// current ts
SSDataBlock
*
prefetchedBlock
;
SResultRow
*
pResultRow
;
}
SMergeAlignedIntervalAggOperatorInfo
;
typedef
struct
SStreamIntervalOperatorInfo
{
SOptrBasicInfo
binfo
;
// basic info
SAggSupporter
aggSup
;
// aggregate supporter
SExprSupp
scalarSupp
;
// supporter for perform scalar function
SGroupResInfo
groupResInfo
;
// multiple results build supporter
SInterval
interval
;
// interval info
int32_t
primaryTsIndex
;
// primary time stamp slot id from result of downstream operator.
STimeWindowAggSupp
twAggSup
;
bool
invertible
;
bool
ignoreExpiredData
;
bool
ignoreExpiredDataSaved
;
SArray
*
pDelWins
;
// SWinRes
int32_t
delIndex
;
SSDataBlock
*
pDelRes
;
SPhysiNode
*
pPhyNode
;
// create new child
SHashObj
*
pPullDataMap
;
SArray
*
pPullWins
;
// SPullWindowInfo
int32_t
pullIndex
;
SSDataBlock
*
pPullDataRes
;
bool
isFinal
;
SArray
*
pChildren
;
SStreamState
*
pState
;
SWinKey
delKey
;
uint64_t
numOfDatapack
;
SArray
*
pUpdated
;
SSHashObj
*
pUpdatedMap
;
int64_t
dataVersion
;
}
SStreamIntervalOperatorInfo
;
typedef
struct
SDataGroupInfo
{
uint64_t
groupId
;
int64_t
numOfRows
;
SArray
*
pPageList
;
}
SDataGroupInfo
;
typedef
struct
SWindowRowsSup
{
STimeWindow
win
;
TSKEY
prevTs
;
int32_t
startRowIndex
;
int32_t
numOfRows
;
uint64_t
groupId
;
}
SWindowRowsSup
;
typedef
struct
SResultWindowInfo
{
void
*
pOutputBuf
;
SSessionKey
sessionWin
;
bool
isOutput
;
}
SResultWindowInfo
;
typedef
struct
SStateWindowInfo
{
SResultWindowInfo
winInfo
;
SStateKeys
*
pStateKey
;
}
SStateWindowInfo
;
typedef
struct
SStreamSessionAggOperatorInfo
{
SOptrBasicInfo
binfo
;
SStreamAggSupporter
streamAggSup
;
SExprSupp
scalarSupp
;
// supporter for perform scalar function
SGroupResInfo
groupResInfo
;
int32_t
primaryTsIndex
;
// primary timestamp slot id
int32_t
endTsIndex
;
// window end timestamp slot id
int32_t
order
;
// current SSDataBlock scan order
STimeWindowAggSupp
twAggSup
;
SSDataBlock
*
pWinBlock
;
// window result
SSDataBlock
*
pDelRes
;
// delete result
SSDataBlock
*
pUpdateRes
;
// update window
bool
returnUpdate
;
SSHashObj
*
pStDeleted
;
void
*
pDelIterator
;
SArray
*
pChildren
;
// cache for children's result; final stream operator
SPhysiNode
*
pPhyNode
;
// create new child
bool
isFinal
;
bool
ignoreExpiredData
;
bool
ignoreExpiredDataSaved
;
SArray
*
pUpdated
;
SSHashObj
*
pStUpdated
;
int64_t
dataVersion
;
}
SStreamSessionAggOperatorInfo
;
typedef
struct
SStreamStateAggOperatorInfo
{
SOptrBasicInfo
binfo
;
SStreamAggSupporter
streamAggSup
;
SExprSupp
scalarSupp
;
// supporter for perform scalar function
SGroupResInfo
groupResInfo
;
int32_t
primaryTsIndex
;
// primary timestamp slot id
STimeWindowAggSupp
twAggSup
;
SColumn
stateCol
;
SSDataBlock
*
pDelRes
;
SSHashObj
*
pSeDeleted
;
void
*
pDelIterator
;
SArray
*
pChildren
;
// cache for children's result;
bool
ignoreExpiredData
;
bool
ignoreExpiredDataSaved
;
SArray
*
pUpdated
;
SSHashObj
*
pSeUpdated
;
int64_t
dataVersion
;
}
SStreamStateAggOperatorInfo
;
typedef
struct
SStreamPartitionOperatorInfo
{
SOptrBasicInfo
binfo
;
SPartitionBySupporter
partitionSup
;
SExprSupp
scalarSup
;
SExprSupp
tbnameCalSup
;
SExprSupp
tagCalSup
;
SHashObj
*
pPartitions
;
void
*
parIte
;
void
*
pTbNameIte
;
SSDataBlock
*
pInputDataBlock
;
int32_t
tsColIndex
;
SSDataBlock
*
pDelRes
;
SSDataBlock
*
pCreateTbRes
;
}
SStreamPartitionOperatorInfo
;
typedef
struct
SStreamFillSupporter
{
int32_t
type
;
// fill type
SInterval
interval
;
SResultRowData
prev
;
SResultRowData
cur
;
SResultRowData
next
;
SResultRowData
nextNext
;
SFillColInfo
*
pAllColInfo
;
// fill exprs and not fill exprs
SExprSupp
notFillExprSup
;
int32_t
numOfAllCols
;
// number of all exprs, including the tags columns
int32_t
numOfFillCols
;
int32_t
numOfNotFillCols
;
int32_t
rowSize
;
SSHashObj
*
pResMap
;
bool
hasDelete
;
}
SStreamFillSupporter
;
typedef
struct
SStreamFillOperatorInfo
{
SStreamFillSupporter
*
pFillSup
;
SSDataBlock
*
pRes
;
SSDataBlock
*
pSrcBlock
;
int32_t
srcRowIndex
;
SSDataBlock
*
pSrcDelBlock
;
int32_t
srcDelRowIndex
;
SSDataBlock
*
pDelRes
;
SColMatchInfo
matchInfo
;
int32_t
primaryTsCol
;
int32_t
primarySrcSlotId
;
SStreamFillInfo
*
pFillInfo
;
}
SStreamFillOperatorInfo
;
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
SSchemaWrapper
*
extractQueriedColumnSchema
(
SScanPhysiNode
*
pScanNode
);
int32_t
initQueriedTableSchemaInfo
(
SReadHandle
*
pHandle
,
SScanPhysiNode
*
pScanNode
,
const
char
*
dbName
,
SExecTaskInfo
*
pTaskInfo
);
void
initBasicInfo
(
SOptrBasicInfo
*
pInfo
,
SSDataBlock
*
pBlock
);
void
cleanupBasicInfo
(
SOptrBasicInfo
*
pInfo
);
int32_t
initExprSupp
(
SExprSupp
*
pSup
,
SExprInfo
*
pExprInfo
,
int32_t
numOfExpr
);
void
cleanupExprSupp
(
SExprSupp
*
pSup
);
void
destroyExprInfo
(
SExprInfo
*
pExpr
,
int32_t
numOfExprs
);
int32_t
initAggSup
(
SExprSupp
*
pSup
,
SAggSupporter
*
pAggSup
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
size_t
keyBufSize
,
const
char
*
pkey
,
void
*
pState
);
void
cleanupAggSup
(
SAggSupporter
*
pAggSup
);
void
initResultSizeInfo
(
SResultInfo
*
pResultInfo
,
int32_t
numOfRows
);
void
doBuildStreamResBlock
(
struct
SOperatorInfo
*
pOperator
,
SOptrBasicInfo
*
pbInfo
,
SGroupResInfo
*
pGroupResInfo
,
SDiskbasedBuf
*
pBuf
);
void
doBuildResultDatablock
(
struct
SOperatorInfo
*
pOperator
,
SOptrBasicInfo
*
pbInfo
,
SGroupResInfo
*
pGroupResInfo
,
SDiskbasedBuf
*
pBuf
);
bool
hasLimitOffsetInfo
(
SLimitInfo
*
pLimitInfo
);
bool
hasSlimitOffsetInfo
(
SLimitInfo
*
pLimitInfo
);
void
initLimitInfo
(
const
SNode
*
pLimit
,
const
SNode
*
pSLimit
,
SLimitInfo
*
pLimitInfo
);
void
resetLimitInfoForNextGroup
(
SLimitInfo
*
pLimitInfo
);
bool
applyLimitOffset
(
SLimitInfo
*
pLimitInfo
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
);
void
applyAggFunctionOnPartialTuples
(
SExecTaskInfo
*
taskInfo
,
SqlFunctionCtx
*
pCtx
,
SColumnInfoData
*
pTimeWindowData
,
int32_t
offset
,
int32_t
forwardStep
,
int32_t
numOfTotal
,
int32_t
numOfOutput
);
int32_t
extractDataBlockFromFetchRsp
(
SSDataBlock
*
pRes
,
char
*
pData
,
SArray
*
pColList
,
char
**
pNextStart
);
void
updateLoadRemoteInfo
(
SLoadRemoteDataInfo
*
pInfo
,
int64_t
numOfRows
,
int32_t
dataLen
,
int64_t
startTs
,
struct
SOperatorInfo
*
pOperator
);
STimeWindow
getFirstQualifiedTimeWindow
(
int64_t
ts
,
STimeWindow
*
pWindow
,
SInterval
*
pInterval
,
int32_t
order
);
int32_t
getBufferPgSize
(
int32_t
rowSize
,
uint32_t
*
defaultPgsz
,
uint32_t
*
defaultBufsz
);
extern
void
doDestroyExchangeOperatorInfo
(
void
*
param
);
void
doFilter
(
SSDataBlock
*
pBlock
,
SFilterInfo
*
pFilterInfo
,
SColMatchInfo
*
pColMatchInfo
);
int32_t
addTagPseudoColumnData
(
SReadHandle
*
pHandle
,
const
SExprInfo
*
pExpr
,
int32_t
numOfExpr
,
SSDataBlock
*
pBlock
,
int32_t
rows
,
const
char
*
idStr
,
STableMetaCacheInfo
*
pCache
);
void
appendOneRowToDataBlock
(
SSDataBlock
*
pBlock
,
STupleHandle
*
pTupleHandle
);
void
setTbNameColData
(
const
SSDataBlock
*
pBlock
,
SColumnInfoData
*
pColInfoData
,
int32_t
functionId
,
const
char
*
name
);
void
setResultRowInitCtx
(
SResultRow
*
pResult
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowEntryInfoOffset
);
void
clearResultRowInitFlag
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
);
SResultRow
*
doSetResultOutBufByKey
(
SDiskbasedBuf
*
pResultBuf
,
SResultRowInfo
*
pResultRowInfo
,
char
*
pData
,
int16_t
bytes
,
bool
masterscan
,
uint64_t
groupId
,
SExecTaskInfo
*
pTaskInfo
,
bool
isIntervalQuery
,
SAggSupporter
*
pSup
,
bool
keepGroup
);
int32_t
projectApplyFunctions
(
SExprInfo
*
pExpr
,
SSDataBlock
*
pResult
,
SSDataBlock
*
pSrcBlock
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
SArray
*
pPseudoList
);
void
setInputDataBlock
(
SExprSupp
*
pExprSupp
,
SSDataBlock
*
pBlock
,
int32_t
order
,
int32_t
scanFlag
,
bool
createDummyCol
);
int32_t
checkForQueryBuf
(
size_t
numOfTables
);
SArray
*
getTableListInfo
(
const
SExecTaskInfo
*
pTaskInfo
);
int32_t
createDataSinkParam
(
SDataSinkNode
*
pNode
,
void
**
pParam
,
SExecTaskInfo
*
pTask
,
SReadHandle
*
readHandle
);
int32_t
getOperatorExplainExecInfo
(
struct
SOperatorInfo
*
operatorInfo
,
SArray
*
pExecInfoList
);
STimeWindow
getActiveTimeWindow
(
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int64_t
ts
,
SInterval
*
pInterval
,
int32_t
order
);
int32_t
getNumOfRowsInTimeWindow
(
SDataBlockInfo
*
pDataBlockInfo
,
TSKEY
*
pPrimaryColumn
,
int32_t
startPos
,
TSKEY
ekey
,
__block_search_fn_t
searchFn
,
STableQueryInfo
*
item
,
int32_t
order
);
int32_t
binarySearchForKey
(
char
*
pValue
,
int
num
,
TSKEY
key
,
int
order
);
SResultRow
*
getNewResultRow
(
SDiskbasedBuf
*
pResultBuf
,
int32_t
*
currentPageId
,
int32_t
interBufSize
);
void
getCurSessionWindow
(
SStreamAggSupporter
*
pAggSup
,
TSKEY
startTs
,
TSKEY
endTs
,
uint64_t
groupId
,
SSessionKey
*
pKey
);
bool
isInTimeWindow
(
STimeWindow
*
pWin
,
TSKEY
ts
,
int64_t
gap
);
bool
functionNeedToExecute
(
SqlFunctionCtx
*
pCtx
);
bool
isOverdue
(
TSKEY
ts
,
STimeWindowAggSupp
*
pSup
);
bool
isCloseWindow
(
STimeWindow
*
pWin
,
STimeWindowAggSupp
*
pSup
);
bool
isDeletedStreamWindow
(
STimeWindow
*
pWin
,
uint64_t
groupId
,
SStreamState
*
pState
,
STimeWindowAggSupp
*
pTwSup
);
void
appendOneRowToStreamSpecialBlock
(
SSDataBlock
*
pBlock
,
TSKEY
*
pStartTs
,
TSKEY
*
pEndTs
,
uint64_t
*
pUid
,
uint64_t
*
pGp
,
void
*
pTbName
);
uint64_t
calGroupIdByData
(
SPartitionBySupporter
*
pParSup
,
SExprSupp
*
pExprSup
,
SSDataBlock
*
pBlock
,
int32_t
rowId
);
int32_t
finalizeResultRows
(
SDiskbasedBuf
*
pBuf
,
SResultRowPosition
*
resultRowPosition
,
SExprSupp
*
pSup
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
);
bool
groupbyTbname
(
SNodeList
*
pGroupList
);
int32_t
buildDataBlockFromGroupRes
(
struct
SOperatorInfo
*
pOperator
,
SStreamState
*
pState
,
SSDataBlock
*
pBlock
,
SExprSupp
*
pSup
,
SGroupResInfo
*
pGroupResInfo
);
int32_t
saveSessionDiscBuf
(
SStreamState
*
pState
,
SSessionKey
*
key
,
void
*
buf
,
int32_t
size
);
int32_t
buildSessionResultDataBlock
(
struct
SOperatorInfo
*
pOperator
,
SStreamState
*
pState
,
SSDataBlock
*
pBlock
,
SExprSupp
*
pSup
,
SGroupResInfo
*
pGroupResInfo
);
int32_t
setOutputBuf
(
SStreamState
*
pState
,
STimeWindow
*
win
,
SResultRow
**
pResult
,
int64_t
tableGroupId
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowEntryInfoOffset
,
SAggSupporter
*
pAggSup
);
int32_t
releaseOutputBuf
(
SStreamState
*
pState
,
SWinKey
*
pKey
,
SResultRow
*
pResult
);
int32_t
saveOutputBuf
(
SStreamState
*
pState
,
SWinKey
*
pKey
,
SResultRow
*
pResult
,
int32_t
resSize
);
void
getNextIntervalWindow
(
SInterval
*
pInterval
,
STimeWindow
*
tw
,
int32_t
order
);
int32_t
getForwardStepsInBlock
(
int32_t
numOfRows
,
__block_search_fn_t
searchFn
,
TSKEY
ekey
,
int32_t
pos
,
int32_t
order
,
int64_t
*
pData
);
void
appendCreateTableRow
(
SStreamState
*
pState
,
SExprSupp
*
pTableSup
,
SExprSupp
*
pTagSup
,
uint64_t
groupId
,
SSDataBlock
*
pSrcBlock
,
int32_t
rowId
,
SSDataBlock
*
pDestBlock
);
SSDataBlock
*
buildCreateTableBlock
(
SExprSupp
*
tbName
,
SExprSupp
*
tag
);
SExprInfo
*
createExpr
(
SNodeList
*
pNodeList
,
int32_t
*
numOfExprs
);
void
copyResultrowToDataBlock
(
SExprInfo
*
pExprInfo
,
int32_t
numOfExprs
,
SResultRow
*
pRow
,
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
const
int32_t
*
rowEntryOffset
,
SExecTaskInfo
*
pTaskInfo
);
void
doUpdateNumOfRows
(
SqlFunctionCtx
*
pCtx
,
SResultRow
*
pRow
,
int32_t
numOfExprs
,
const
int32_t
*
rowEntryOffset
);
void
doClearBufferedBlocks
(
SStreamScanInfo
*
pInfo
);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_EXECUTORIMPL_H
source/libs/executor/inc/operator.h
浏览文件 @
a0c2b347
...
@@ -157,6 +157,7 @@ void destroyOperator(SOperatorInfo* pOperator);
...
@@ -157,6 +157,7 @@ void destroyOperator(SOperatorInfo* pOperator);
SOperatorInfo
*
extractOperatorInTree
(
SOperatorInfo
*
pOperator
,
int32_t
type
,
const
char
*
id
);
SOperatorInfo
*
extractOperatorInTree
(
SOperatorInfo
*
pOperator
,
int32_t
type
,
const
char
*
id
);
int32_t
getTableScanInfo
(
SOperatorInfo
*
pOperator
,
int32_t
*
order
,
int32_t
*
scanFlag
,
bool
inheritUsOrder
);
int32_t
getTableScanInfo
(
SOperatorInfo
*
pOperator
,
int32_t
*
order
,
int32_t
*
scanFlag
,
bool
inheritUsOrder
);
int32_t
stopTableScanOperator
(
SOperatorInfo
*
pOperator
,
const
char
*
pIdStr
);
int32_t
stopTableScanOperator
(
SOperatorInfo
*
pOperator
,
const
char
*
pIdStr
);
int32_t
getOperatorExplainExecInfo
(
struct
SOperatorInfo
*
operatorInfo
,
SArray
*
pExecInfoList
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/libs/executor/inc/querytask.h
浏览文件 @
a0c2b347
...
@@ -22,6 +22,17 @@ extern "C" {
...
@@ -22,6 +22,17 @@ extern "C" {
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str)
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str)
enum
{
// when this task starts to execute, this status will set
TASK_NOT_COMPLETED
=
0x1u
,
/* Task is over
* 1. this status is used in one row result query process, e.g., count/sum/first/last/ avg...etc.
* 2. when all data within queried time window, it is also denoted as query_completed
*/
TASK_COMPLETED
=
0x2u
,
};
typedef
struct
STaskIdInfo
{
typedef
struct
STaskIdInfo
{
uint64_t
queryId
;
// this is also a request id
uint64_t
queryId
;
// this is also a request id
uint64_t
subplanId
;
uint64_t
subplanId
;
...
@@ -44,6 +55,23 @@ typedef struct STaskStopInfo {
...
@@ -44,6 +55,23 @@ typedef struct STaskStopInfo {
SArray
*
pStopInfo
;
SArray
*
pStopInfo
;
}
STaskStopInfo
;
}
STaskStopInfo
;
typedef
struct
{
STqOffsetVal
currentOffset
;
// for tmq
SMqMetaRsp
metaRsp
;
// for tmq fetching meta
int64_t
snapshotVer
;
SPackedData
submit
;
// todo remove it
SSchemaWrapper
*
schema
;
char
tbName
[
TSDB_TABLE_NAME_LEN
];
// this is the current scan table: todo refactor
int8_t
recoverStep
;
int8_t
recoverScanFinished
;
SQueryTableDataCond
tableCond
;
int64_t
fillHistoryVer1
;
int64_t
fillHistoryVer2
;
SStreamState
*
pState
;
int64_t
dataVersion
;
int64_t
checkPointId
;
}
SStreamTaskInfo
;
struct
SExecTaskInfo
{
struct
SExecTaskInfo
{
STaskIdInfo
id
;
STaskIdInfo
id
;
uint32_t
status
;
uint32_t
status
;
...
@@ -75,6 +103,7 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
...
@@ -75,6 +103,7 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t
createExecTaskInfo
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
taskId
,
int32_t
createExecTaskInfo
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
taskId
,
int32_t
vgId
,
char
*
sql
,
EOPTR_EXEC_MODEL
model
);
int32_t
vgId
,
char
*
sql
,
EOPTR_EXEC_MODEL
model
);
int32_t
qAppendTaskStopInfo
(
SExecTaskInfo
*
pTaskInfo
,
SExchangeOpStopInfo
*
pInfo
);
int32_t
qAppendTaskStopInfo
(
SExecTaskInfo
*
pTaskInfo
,
SExchangeOpStopInfo
*
pInfo
);
SArray
*
getTableListInfo
(
const
SExecTaskInfo
*
pTaskInfo
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/libs/executor/src/aggregateoperator.c
浏览文件 @
a0c2b347
...
@@ -20,16 +20,16 @@
...
@@ -20,16 +20,16 @@
#include "tfill.h"
#include "tfill.h"
#include "tname.h"
#include "tname.h"
#include "tdatablock.h"
#include "executorInt.h"
#include "tglobal.h"
#include "executorimpl.h"
#include "index.h"
#include "index.h"
#include "operator.h"
#include "query.h"
#include "query.h"
#include "querytask.h"
#include "tcompare.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "thash.h"
#include "thash.h"
#include "ttypes.h"
#include "ttypes.h"
#include "operator.h"
#include "querytask.h"
typedef
struct
{
typedef
struct
{
bool
hasAgg
;
bool
hasAgg
;
...
...
source/libs/executor/src/cachescanoperator.c
浏览文件 @
a0c2b347
...
@@ -20,12 +20,12 @@
...
@@ -20,12 +20,12 @@
#include "tdatablock.h"
#include "tdatablock.h"
#include "tmsg.h"
#include "tmsg.h"
#include "executorimpl.h"
#include "executorInt.h"
#include "operator.h"
#include "querytask.h"
#include "tcompare.h"
#include "tcompare.h"
#include "thash.h"
#include "thash.h"
#include "ttypes.h"
#include "ttypes.h"
#include "operator.h"
#include "querytask.h"
typedef
struct
SCacheRowsScanInfo
{
typedef
struct
SCacheRowsScanInfo
{
SSDataBlock
*
pRes
;
SSDataBlock
*
pRes
;
...
...
source/libs/executor/src/dataDeleter.c
浏览文件 @
a0c2b347
...
@@ -15,7 +15,7 @@
...
@@ -15,7 +15,7 @@
#include "dataSinkInt.h"
#include "dataSinkInt.h"
#include "dataSinkMgt.h"
#include "dataSinkMgt.h"
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "planner.h"
#include "planner.h"
#include "tcompression.h"
#include "tcompression.h"
#include "tdatablock.h"
#include "tdatablock.h"
...
...
source/libs/executor/src/dataDispatcher.c
浏览文件 @
a0c2b347
...
@@ -15,7 +15,7 @@
...
@@ -15,7 +15,7 @@
#include "dataSinkInt.h"
#include "dataSinkInt.h"
#include "dataSinkMgt.h"
#include "dataSinkMgt.h"
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "planner.h"
#include "planner.h"
#include "tcompression.h"
#include "tcompression.h"
#include "tdatablock.h"
#include "tdatablock.h"
...
...
source/libs/executor/src/dataInserter.c
浏览文件 @
a0c2b347
...
@@ -15,7 +15,7 @@
...
@@ -15,7 +15,7 @@
#include "dataSinkInt.h"
#include "dataSinkInt.h"
#include "dataSinkMgt.h"
#include "dataSinkMgt.h"
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "planner.h"
#include "planner.h"
#include "tcompression.h"
#include "tcompression.h"
#include "tdatablock.h"
#include "tdatablock.h"
...
...
source/libs/executor/src/eventwindowoperator.c
浏览文件 @
a0c2b347
...
@@ -13,16 +13,16 @@
...
@@ -13,16 +13,16 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "filter.h"
#include "filter.h"
#include "function.h"
#include "function.h"
#include "functionMgt.h"
#include "functionMgt.h"
#include "operator.h"
#include "querytask.h"
#include "tcommon.h"
#include "tcommon.h"
#include "tcompare.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tdatablock.h"
#include "ttime.h"
#include "ttime.h"
#include "operator.h"
#include "querytask.h"
typedef
struct
SEventWindowOperatorInfo
{
typedef
struct
SEventWindowOperatorInfo
{
SOptrBasicInfo
binfo
;
SOptrBasicInfo
binfo
;
...
...
source/libs/executor/src/exchangeoperator.c
浏览文件 @
a0c2b347
...
@@ -13,19 +13,19 @@
...
@@ -13,19 +13,19 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include "executorInt.h"
#include "filter.h"
#include "filter.h"
#include "function.h"
#include "function.h"
#include "os.h"
#include "tname.h"
#include "tref.h"
#include "tdatablock.h"
#include "tmsg.h"
#include "executorimpl.h"
#include "index.h"
#include "index.h"
#include "query.h"
#include "thash.h"
#include "operator.h"
#include "operator.h"
#include "os.h"
#include "query.h"
#include "querytask.h"
#include "querytask.h"
#include "tdatablock.h"
#include "thash.h"
#include "tmsg.h"
#include "tname.h"
#include "tref.h"
typedef
struct
SFetchRspHandleWrapper
{
typedef
struct
SFetchRspHandleWrapper
{
uint32_t
exchangeId
;
uint32_t
exchangeId
;
...
...
source/libs/executor/src/executil.c
浏览文件 @
a0c2b347
...
@@ -24,9 +24,9 @@
...
@@ -24,9 +24,9 @@
#include "ttime.h"
#include "ttime.h"
#include "executil.h"
#include "executil.h"
#include "executorimpl.h"
#include "executorInt.h"
#include "tcompression.h"
#include "querytask.h"
#include "querytask.h"
#include "tcompression.h"
typedef
struct
STableListIdInfo
{
typedef
struct
STableListIdInfo
{
uint64_t
suid
;
uint64_t
suid
;
...
...
source/libs/executor/src/executor.c
浏览文件 @
a0c2b347
...
@@ -14,14 +14,14 @@
...
@@ -14,14 +14,14 @@
*/
*/
#include "executor.h"
#include "executor.h"
#include "executorimpl.h"
#include "executorInt.h"
#include "operator.h"
#include "planner.h"
#include "planner.h"
#include "querytask.h"
#include "tdatablock.h"
#include "tdatablock.h"
#include "tref.h"
#include "tref.h"
#include "tudf.h"
#include "tudf.h"
#include "vnode.h"
#include "vnode.h"
#include "operator.h"
#include "querytask.h"
static
TdThreadOnce
initPoolOnce
=
PTHREAD_ONCE_INIT
;
static
TdThreadOnce
initPoolOnce
=
PTHREAD_ONCE_INIT
;
int32_t
exchangeObjRefPool
=
-
1
;
int32_t
exchangeObjRefPool
=
-
1
;
...
@@ -718,8 +718,6 @@ void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
...
@@ -718,8 +718,6 @@ void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo) {
taosArrayRemove
(
pTaskInfo
->
stopInfo
.
pStopInfo
,
idx
);
taosArrayRemove
(
pTaskInfo
->
stopInfo
.
pStopInfo
,
idx
);
}
}
taosWUnLockLatch
(
&
pTaskInfo
->
stopInfo
.
lock
);
taosWUnLockLatch
(
&
pTaskInfo
->
stopInfo
.
lock
);
return
;
}
}
void
qStopTaskOperators
(
SExecTaskInfo
*
pTaskInfo
)
{
void
qStopTaskOperators
(
SExecTaskInfo
*
pTaskInfo
)
{
...
@@ -1304,3 +1302,25 @@ SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
...
@@ -1304,3 +1302,25 @@ SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
taosArrayDestroy
(
plist
);
taosArrayDestroy
(
plist
);
return
pUidList
;
return
pUidList
;
}
}
static
void
extractTableList
(
SArray
*
pList
,
const
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
SStreamScanInfo
*
pScanInfo
=
pOperator
->
info
;
STableScanInfo
*
pTableScanInfo
=
pScanInfo
->
pTableScanOp
->
info
;
taosArrayPush
(
pList
,
&
pTableScanInfo
->
base
.
pTableListInfo
);
}
else
if
(
pOperator
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
)
{
STableScanInfo
*
pScanInfo
=
pOperator
->
info
;
taosArrayPush
(
pList
,
&
pScanInfo
->
base
.
pTableListInfo
);
}
else
{
if
(
pOperator
->
pDownstream
!=
NULL
&&
pOperator
->
pDownstream
[
0
]
!=
NULL
)
{
extractTableList
(
pList
,
pOperator
->
pDownstream
[
0
]);
}
}
}
SArray
*
getTableListInfo
(
const
SExecTaskInfo
*
pTaskInfo
)
{
SArray
*
pArray
=
taosArrayInit
(
0
,
POINTER_BYTES
);
SOperatorInfo
*
pOperator
=
pTaskInfo
->
pRoot
;
extractTableList
(
pArray
,
pOperator
);
return
pArray
;
}
\ No newline at end of file
source/libs/executor/src/executor
impl
.c
→
source/libs/executor/src/executor
Int
.c
浏览文件 @
a0c2b347
...
@@ -25,15 +25,15 @@
...
@@ -25,15 +25,15 @@
#include "tmsg.h"
#include "tmsg.h"
#include "ttime.h"
#include "ttime.h"
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "index.h"
#include "index.h"
#include "operator.h"
#include "query.h"
#include "query.h"
#include "querytask.h"
#include "tcompare.h"
#include "tcompare.h"
#include "thash.h"
#include "thash.h"
#include "ttypes.h"
#include "ttypes.h"
#include "vnode.h"
#include "vnode.h"
#include "operator.h"
#include "querytask.h"
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
...
@@ -1059,37 +1059,6 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo*
...
@@ -1059,37 +1059,6 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo*
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
getOperatorExplainExecInfo
(
SOperatorInfo
*
operatorInfo
,
SArray
*
pExecInfoList
)
{
SExplainExecInfo
execInfo
=
{
0
};
SExplainExecInfo
*
pExplainInfo
=
taosArrayPush
(
pExecInfoList
,
&
execInfo
);
pExplainInfo
->
numOfRows
=
operatorInfo
->
resultInfo
.
totalRows
;
pExplainInfo
->
startupCost
=
operatorInfo
->
cost
.
openCost
;
pExplainInfo
->
totalCost
=
operatorInfo
->
cost
.
totalCost
;
pExplainInfo
->
verboseLen
=
0
;
pExplainInfo
->
verboseInfo
=
NULL
;
if
(
operatorInfo
->
fpSet
.
getExplainFn
)
{
int32_t
code
=
operatorInfo
->
fpSet
.
getExplainFn
(
operatorInfo
,
&
pExplainInfo
->
verboseInfo
,
&
pExplainInfo
->
verboseLen
);
if
(
code
)
{
qError
(
"%s operator getExplainFn failed, code:%s"
,
GET_TASKID
(
operatorInfo
->
pTaskInfo
),
tstrerror
(
code
));
return
code
;
}
}
int32_t
code
=
0
;
for
(
int32_t
i
=
0
;
i
<
operatorInfo
->
numOfDownstream
;
++
i
)
{
code
=
getOperatorExplainExecInfo
(
operatorInfo
->
pDownstream
[
i
],
pExecInfoList
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
// taosMemoryFreeClear(*pRes);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
setOutputBuf
(
SStreamState
*
pState
,
STimeWindow
*
win
,
SResultRow
**
pResult
,
int64_t
tableGroupId
,
int32_t
setOutputBuf
(
SStreamState
*
pState
,
STimeWindow
*
win
,
SResultRow
**
pResult
,
int64_t
tableGroupId
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowEntryInfoOffset
,
SAggSupporter
*
pAggSup
)
{
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowEntryInfoOffset
,
SAggSupporter
*
pAggSup
)
{
SWinKey
key
=
{
SWinKey
key
=
{
...
@@ -1331,25 +1300,3 @@ void qStreamCloseTsdbReader(void* task) {
...
@@ -1331,25 +1300,3 @@ void qStreamCloseTsdbReader(void* task) {
}
}
}
}
}
}
static
void
extractTableList
(
SArray
*
pList
,
const
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
SStreamScanInfo
*
pScanInfo
=
pOperator
->
info
;
STableScanInfo
*
pTableScanInfo
=
pScanInfo
->
pTableScanOp
->
info
;
taosArrayPush
(
pList
,
&
pTableScanInfo
->
base
.
pTableListInfo
);
}
else
if
(
pOperator
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
)
{
STableScanInfo
*
pScanInfo
=
pOperator
->
info
;
taosArrayPush
(
pList
,
&
pScanInfo
->
base
.
pTableListInfo
);
}
else
{
if
(
pOperator
->
pDownstream
!=
NULL
&&
pOperator
->
pDownstream
[
0
]
!=
NULL
)
{
extractTableList
(
pList
,
pOperator
->
pDownstream
[
0
]);
}
}
}
SArray
*
getTableListInfo
(
const
SExecTaskInfo
*
pTaskInfo
)
{
SArray
*
pArray
=
taosArrayInit
(
0
,
POINTER_BYTES
);
SOperatorInfo
*
pOperator
=
pTaskInfo
->
pRoot
;
extractTableList
(
pArray
,
pOperator
);
return
pArray
;
}
\ No newline at end of file
source/libs/executor/src/filloperator.c
浏览文件 @
a0c2b347
...
@@ -20,7 +20,7 @@
...
@@ -20,7 +20,7 @@
#include "tmsg.h"
#include "tmsg.h"
#include "ttypes.h"
#include "ttypes.h"
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "tcommon.h"
#include "tcommon.h"
#include "thash.h"
#include "thash.h"
#include "ttime.h"
#include "ttime.h"
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
a0c2b347
...
@@ -22,12 +22,11 @@
...
@@ -22,12 +22,11 @@
#include "tmsg.h"
#include "tmsg.h"
#include "executorInt.h"
#include "executorInt.h"
#include "executorimpl.h"
#include "operator.h"
#include "querytask.h"
#include "tcompare.h"
#include "tcompare.h"
#include "thash.h"
#include "thash.h"
#include "ttypes.h"
#include "ttypes.h"
#include "operator.h"
#include "querytask.h"
typedef
struct
SGroupbyOperatorInfo
{
typedef
struct
SGroupbyOperatorInfo
{
SOptrBasicInfo
binfo
;
SOptrBasicInfo
binfo
;
...
...
source/libs/executor/src/joinoperator.c
浏览文件 @
a0c2b347
...
@@ -13,18 +13,18 @@
...
@@ -13,18 +13,18 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include "executorInt.h"
#include "filter.h"
#include "filter.h"
#include "executorimpl.h"
#include "function.h"
#include "function.h"
#include "operator.h"
#include "os.h"
#include "os.h"
#include "querynodes.h"
#include "querynodes.h"
#include "querytask.h"
#include "tcompare.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tdatablock.h"
#include "thash.h"
#include "thash.h"
#include "tmsg.h"
#include "tmsg.h"
#include "ttypes.h"
#include "ttypes.h"
#include "operator.h"
#include "querytask.h"
typedef
struct
SJoinRowCtx
{
typedef
struct
SJoinRowCtx
{
bool
rowRemains
;
bool
rowRemains
;
...
...
source/libs/executor/src/operator.c
浏览文件 @
a0c2b347
...
@@ -20,12 +20,12 @@
...
@@ -20,12 +20,12 @@
#include "tglobal.h"
#include "tglobal.h"
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "index.h"
#include "index.h"
#include "query.h"
#include "vnode.h"
#include "operator.h"
#include "operator.h"
#include "query.h"
#include "querytask.h"
#include "querytask.h"
#include "vnode.h"
SOperatorFpSet
createOperatorFpSet
(
__optr_open_fn_t
openFn
,
__optr_fn_t
nextFn
,
__optr_fn_t
cleanup
,
SOperatorFpSet
createOperatorFpSet
(
__optr_open_fn_t
openFn
,
__optr_fn_t
nextFn
,
__optr_fn_t
cleanup
,
__optr_close_fn_t
closeFn
,
__optr_reqBuf_fn_t
reqBufFn
,
__optr_close_fn_t
closeFn
,
__optr_reqBuf_fn_t
reqBufFn
,
...
@@ -75,28 +75,6 @@ void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, b
...
@@ -75,28 +75,6 @@ void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, b
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
}
}
void
destroyOperator
(
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
==
NULL
)
{
return
;
}
if
(
pOperator
->
fpSet
.
closeFn
!=
NULL
)
{
pOperator
->
fpSet
.
closeFn
(
pOperator
->
info
);
}
if
(
pOperator
->
pDownstream
!=
NULL
)
{
for
(
int32_t
i
=
0
;
i
<
pOperator
->
numOfDownstream
;
++
i
)
{
destroyOperator
(
pOperator
->
pDownstream
[
i
]);
}
taosMemoryFreeClear
(
pOperator
->
pDownstream
);
pOperator
->
numOfDownstream
=
0
;
}
cleanupExprSupp
(
&
pOperator
->
exprSupp
);
taosMemoryFreeClear
(
pOperator
);
}
// each operator should be set their own function to return total cost buffer
// each operator should be set their own function to return total cost buffer
int32_t
optrDefaultBufFn
(
SOperatorInfo
*
pOperator
)
{
int32_t
optrDefaultBufFn
(
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
->
blocking
)
{
if
(
pOperator
->
blocking
)
{
...
@@ -106,40 +84,6 @@ int32_t optrDefaultBufFn(SOperatorInfo* pOperator) {
...
@@ -106,40 +84,6 @@ int32_t optrDefaultBufFn(SOperatorInfo* pOperator) {
}
}
}
}
//int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder) {
// // todo add more information about exchange operation
// int32_t type = pOperator->operatorType;
// if (type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN ||
// type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN ||
// type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) {
// *order = TSDB_ORDER_ASC;
// *scanFlag = MAIN_SCAN;
// return TSDB_CODE_SUCCESS;
// } else if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
// if (!inheritUsOrder) {
// *order = TSDB_ORDER_ASC;
// }
// *scanFlag = MAIN_SCAN;
// return TSDB_CODE_SUCCESS;
// } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
// STableScanInfo* pTableScanInfo = pOperator->info;
// *order = pTableScanInfo->base.cond.order;
// *scanFlag = pTableScanInfo->base.scanFlag;
// return TSDB_CODE_SUCCESS;
// } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN) {
// STableMergeScanInfo* pTableScanInfo = pOperator->info;
// *order = pTableScanInfo->base.cond.order;
// *scanFlag = pTableScanInfo->base.scanFlag;
// return TSDB_CODE_SUCCESS;
// } else {
// if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
// return TSDB_CODE_INVALID_PARA;
// } else {
// return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag, inheritUsOrder);
// }
// }
//}
static
int64_t
getQuerySupportBufSize
(
size_t
numOfTables
)
{
static
int64_t
getQuerySupportBufSize
(
size_t
numOfTables
)
{
size_t
s1
=
sizeof
(
STableQueryInfo
);
size_t
s1
=
sizeof
(
STableQueryInfo
);
// size_t s3 = sizeof(STableCheckInfo); buffer consumption in tsdb
// size_t s3 = sizeof(STableCheckInfo); buffer consumption in tsdb
...
@@ -347,7 +291,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
...
@@ -347,7 +291,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
code
=
initQueriedTableSchemaInfo
(
pHandle
,
&
pTableScanNode
->
scan
,
dbname
,
pTaskInfo
);
code
=
initQueriedTableSchemaInfo
(
pHandle
,
&
pTableScanNode
->
scan
,
dbname
,
pTaskInfo
);
if
(
code
)
{
if
(
code
)
{
pTaskInfo
->
code
=
terrno
;
pTaskInfo
->
code
=
code
;
tableListDestroy
(
pTableListInfo
);
tableListDestroy
(
pTableListInfo
);
return
NULL
;
return
NULL
;
}
}
...
@@ -355,6 +299,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
...
@@ -355,6 +299,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
pOperator
=
createTableScanOperatorInfo
(
pTableScanNode
,
pHandle
,
pTableListInfo
,
pTaskInfo
);
pOperator
=
createTableScanOperatorInfo
(
pTableScanNode
,
pHandle
,
pTableListInfo
,
pTaskInfo
);
if
(
NULL
==
pOperator
)
{
if
(
NULL
==
pOperator
)
{
pTaskInfo
->
code
=
terrno
;
pTaskInfo
->
code
=
terrno
;
tableListDestroy
(
pTableListInfo
);
return
NULL
;
return
NULL
;
}
}
...
@@ -578,3 +523,56 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
...
@@ -578,3 +523,56 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
return
pOptr
;
return
pOptr
;
}
}
void
destroyOperator
(
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
==
NULL
)
{
return
;
}
if
(
pOperator
->
fpSet
.
closeFn
!=
NULL
)
{
pOperator
->
fpSet
.
closeFn
(
pOperator
->
info
);
}
if
(
pOperator
->
pDownstream
!=
NULL
)
{
for
(
int32_t
i
=
0
;
i
<
pOperator
->
numOfDownstream
;
++
i
)
{
destroyOperator
(
pOperator
->
pDownstream
[
i
]);
}
taosMemoryFreeClear
(
pOperator
->
pDownstream
);
pOperator
->
numOfDownstream
=
0
;
}
cleanupExprSupp
(
&
pOperator
->
exprSupp
);
taosMemoryFreeClear
(
pOperator
);
}
int32_t
getOperatorExplainExecInfo
(
SOperatorInfo
*
operatorInfo
,
SArray
*
pExecInfoList
)
{
SExplainExecInfo
execInfo
=
{
0
};
SExplainExecInfo
*
pExplainInfo
=
taosArrayPush
(
pExecInfoList
,
&
execInfo
);
pExplainInfo
->
numOfRows
=
operatorInfo
->
resultInfo
.
totalRows
;
pExplainInfo
->
startupCost
=
operatorInfo
->
cost
.
openCost
;
pExplainInfo
->
totalCost
=
operatorInfo
->
cost
.
totalCost
;
pExplainInfo
->
verboseLen
=
0
;
pExplainInfo
->
verboseInfo
=
NULL
;
if
(
operatorInfo
->
fpSet
.
getExplainFn
)
{
int32_t
code
=
operatorInfo
->
fpSet
.
getExplainFn
(
operatorInfo
,
&
pExplainInfo
->
verboseInfo
,
&
pExplainInfo
->
verboseLen
);
if
(
code
)
{
qError
(
"%s operator getExplainFn failed, code:%s"
,
GET_TASKID
(
operatorInfo
->
pTaskInfo
),
tstrerror
(
code
));
return
code
;
}
}
int32_t
code
=
0
;
for
(
int32_t
i
=
0
;
i
<
operatorInfo
->
numOfDownstream
;
++
i
)
{
code
=
getOperatorExplainExecInfo
(
operatorInfo
->
pDownstream
[
i
],
pExecInfoList
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
// taosMemoryFreeClear(*pRes);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
return
TSDB_CODE_SUCCESS
;
}
source/libs/executor/src/projectoperator.c
浏览文件 @
a0c2b347
...
@@ -13,7 +13,7 @@
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "filter.h"
#include "filter.h"
#include "functionMgt.h"
#include "functionMgt.h"
#include "operator.h"
#include "operator.h"
...
...
source/libs/executor/src/querytask.c
浏览文件 @
a0c2b347
...
@@ -24,14 +24,14 @@
...
@@ -24,14 +24,14 @@
#include "tdatablock.h"
#include "tdatablock.h"
#include "tmsg.h"
#include "tmsg.h"
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "index.h"
#include "index.h"
#include "operator.h"
#include "query.h"
#include "query.h"
#include "querytask.h"
#include "thash.h"
#include "thash.h"
#include "ttypes.h"
#include "ttypes.h"
#include "vnode.h"
#include "vnode.h"
#include "operator.h"
#include "querytask.h"
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
a0c2b347
...
@@ -13,7 +13,7 @@
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "filter.h"
#include "filter.h"
#include "function.h"
#include "function.h"
#include "functionMgt.h"
#include "functionMgt.h"
...
...
source/libs/executor/src/sortoperator.c
浏览文件 @
a0c2b347
...
@@ -13,11 +13,11 @@
...
@@ -13,11 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include "executorInt.h"
#include "filter.h"
#include "filter.h"
#include "executorimpl.h"
#include "tdatablock.h"
#include "operator.h"
#include "operator.h"
#include "querytask.h"
#include "querytask.h"
#include "tdatablock.h"
typedef
struct
SSortOperatorInfo
{
typedef
struct
SSortOperatorInfo
{
SOptrBasicInfo
binfo
;
SOptrBasicInfo
binfo
;
...
...
source/libs/executor/src/sysscanoperator.c
浏览文件 @
a0c2b347
...
@@ -13,7 +13,7 @@
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "filter.h"
#include "filter.h"
#include "function.h"
#include "function.h"
#include "functionMgt.h"
#include "functionMgt.h"
...
...
source/libs/executor/src/tfill.c
浏览文件 @
a0c2b347
...
@@ -20,7 +20,7 @@
...
@@ -20,7 +20,7 @@
#include "tmsg.h"
#include "tmsg.h"
#include "ttypes.h"
#include "ttypes.h"
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "tcommon.h"
#include "tcommon.h"
#include "thash.h"
#include "thash.h"
#include "ttime.h"
#include "ttime.h"
...
...
source/libs/executor/src/timesliceoperator.c
浏览文件 @
a0c2b347
...
@@ -12,17 +12,17 @@
...
@@ -12,17 +12,17 @@
* You should have received a copy of the GNU Affero General Public License
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "filter.h"
#include "filter.h"
#include "function.h"
#include "function.h"
#include "functionMgt.h"
#include "functionMgt.h"
#include "operator.h"
#include "querytask.h"
#include "tcommon.h"
#include "tcommon.h"
#include "tcompare.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tdatablock.h"
#include "tfill.h"
#include "tfill.h"
#include "ttime.h"
#include "ttime.h"
#include "operator.h"
#include "querytask.h"
typedef
struct
STimeSliceOperatorInfo
{
typedef
struct
STimeSliceOperatorInfo
{
SSDataBlock
*
pRes
;
SSDataBlock
*
pRes
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
a0c2b347
...
@@ -12,21 +12,27 @@
...
@@ -12,21 +12,27 @@
* You should have received a copy of the GNU Affero General Public License
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "filter.h"
#include "filter.h"
#include "function.h"
#include "function.h"
#include "functionMgt.h"
#include "functionMgt.h"
#include "operator.h"
#include "querytask.h"
#include "tcommon.h"
#include "tcommon.h"
#include "tcompare.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tdatablock.h"
#include "tfill.h"
#include "tfill.h"
#include "ttime.h"
#include "ttime.h"
#include "operator.h"
#include "querytask.h"
#define IS_FINAL_OP(op) ((op)->isFinal)
#define IS_FINAL_OP(op) ((op)->isFinal)
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);
typedef
struct
SStateWindowInfo
{
SResultWindowInfo
winInfo
;
SStateKeys
*
pStateKey
;
}
SStateWindowInfo
;
typedef
struct
SSessionAggOperatorInfo
{
typedef
struct
SSessionAggOperatorInfo
{
SOptrBasicInfo
binfo
;
SOptrBasicInfo
binfo
;
SAggSupporter
aggSup
;
SAggSupporter
aggSup
;
...
...
source/libs/executor/test/executorTests.cpp
浏览文件 @
a0c2b347
...
@@ -24,13 +24,13 @@
...
@@ -24,13 +24,13 @@
#include "os.h"
#include "os.h"
#include "executor.h"
#include "executor.h"
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "function.h"
#include "function.h"
#include "operator.h"
#include "taos.h"
#include "taos.h"
#include "tdatablock.h"
#include "tdatablock.h"
#include "tdef.h"
#include "tdef.h"
#include "tvariant.h"
#include "tvariant.h"
#include "operator.h"
namespace
{
namespace
{
...
...
source/libs/executor/test/lhashTests.cpp
浏览文件 @
a0c2b347
...
@@ -15,7 +15,7 @@
...
@@ -15,7 +15,7 @@
#include <gtest/gtest.h>
#include <gtest/gtest.h>
#include <iostream>
#include <iostream>
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "tlinearhash.h"
#include "tlinearhash.h"
#pragma GCC diagnostic push
#pragma GCC diagnostic push
...
...
source/libs/executor/test/sortTests.cpp
浏览文件 @
a0c2b347
...
@@ -26,7 +26,7 @@
...
@@ -26,7 +26,7 @@
#include "os.h"
#include "os.h"
#include "executor.h"
#include "executor.h"
#include "executor
impl
.h"
#include "executor
Int
.h"
#include "taos.h"
#include "taos.h"
#include "tcompare.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tdatablock.h"
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录