Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7adc2102
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
7adc2102
编写于
4月 28, 2023
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: do some internal refactor.
上级
25010cb1
变更
23
展开全部
隐藏空白更改
内联
并排
Showing
23 changed file
with
808 addition
and
697 deletion
+808
-697
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+0
-2
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+10
-197
source/libs/executor/inc/operator.h
source/libs/executor/inc/operator.h
+165
-0
source/libs/executor/inc/querytask.h
source/libs/executor/inc/querytask.h
+83
-0
source/libs/executor/src/aggregateoperator.c
source/libs/executor/src/aggregateoperator.c
+2
-5
source/libs/executor/src/cachescanoperator.c
source/libs/executor/src/cachescanoperator.c
+2
-0
source/libs/executor/src/eventwindowoperator.c
source/libs/executor/src/eventwindowoperator.c
+2
-0
source/libs/executor/src/exchangeoperator.c
source/libs/executor/src/exchangeoperator.c
+2
-0
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+1
-0
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+3
-2
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+2
-467
source/libs/executor/src/filloperator.c
source/libs/executor/src/filloperator.c
+3
-1
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+2
-0
source/libs/executor/src/joinoperator.c
source/libs/executor/src/joinoperator.c
+2
-0
source/libs/executor/src/operator.c
source/libs/executor/src/operator.c
+271
-15
source/libs/executor/src/projectoperator.c
source/libs/executor/src/projectoperator.c
+2
-0
source/libs/executor/src/querytask.c
source/libs/executor/src/querytask.c
+240
-0
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+3
-1
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+2
-0
source/libs/executor/src/sysscanoperator.c
source/libs/executor/src/sysscanoperator.c
+3
-0
source/libs/executor/src/timesliceoperator.c
source/libs/executor/src/timesliceoperator.c
+2
-0
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+5
-3
source/libs/executor/test/executorTests.cpp
source/libs/executor/test/executorTests.cpp
+1
-4
未找到文件。
source/libs/executor/inc/executil.h
浏览文件 @
7adc2102
...
...
@@ -39,8 +39,6 @@
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str)
typedef
struct
SGroupResInfo
{
int32_t
index
;
SArray
*
pRows
;
// SArray<SResKeyPos>
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
7adc2102
...
...
@@ -85,39 +85,6 @@ typedef struct SLimit {
typedef
struct
STableScanAnalyzeInfo
SFileBlockLoadRecorder
;
typedef
struct
STaskCostInfo
{
int64_t
created
;
int64_t
start
;
uint64_t
elapsedTime
;
double
extractListTime
;
double
groupIdMapTime
;
SFileBlockLoadRecorder
*
pRecoder
;
}
STaskCostInfo
;
typedef
struct
SOperatorCostInfo
{
double
openCost
;
double
totalCost
;
}
SOperatorCostInfo
;
struct
SOperatorInfo
;
typedef
int32_t
(
*
__optr_encode_fn_t
)(
struct
SOperatorInfo
*
pOperator
,
char
**
result
,
int32_t
*
length
);
typedef
int32_t
(
*
__optr_decode_fn_t
)(
struct
SOperatorInfo
*
pOperator
,
char
*
result
);
typedef
int32_t
(
*
__optr_open_fn_t
)(
struct
SOperatorInfo
*
pOptr
);
typedef
SSDataBlock
*
(
*
__optr_fn_t
)(
struct
SOperatorInfo
*
pOptr
);
typedef
void
(
*
__optr_close_fn_t
)(
void
*
param
);
typedef
int32_t
(
*
__optr_explain_fn_t
)(
struct
SOperatorInfo
*
pOptr
,
void
**
pOptrExplain
,
uint32_t
*
len
);
typedef
int32_t
(
*
__optr_reqBuf_fn_t
)(
struct
SOperatorInfo
*
pOptr
);
typedef
struct
STaskIdInfo
{
uint64_t
queryId
;
// this is also a request id
uint64_t
subplanId
;
uint64_t
templateId
;
char
*
str
;
int32_t
vgId
;
}
STaskIdInfo
;
enum
{
STREAM_RECOVER_STEP__NONE
=
0
,
STREAM_RECOVER_STEP__PREPARE1
,
...
...
@@ -156,51 +123,6 @@ typedef struct SExchangeOpStopInfo {
int64_t
refId
;
}
SExchangeOpStopInfo
;
typedef
struct
STaskStopInfo
{
SRWLatch
lock
;
SArray
*
pStopInfo
;
}
STaskStopInfo
;
struct
SExecTaskInfo
{
STaskIdInfo
id
;
uint32_t
status
;
STimeWindow
window
;
STaskCostInfo
cost
;
int64_t
owner
;
// if it is in execution
int32_t
code
;
int32_t
qbufQuota
;
// total available buffer (in KB) during execution query
int64_t
version
;
// used for stream to record wal version, why not move to sschemainfo
SStreamTaskInfo
streamInfo
;
SSchemaInfo
schemaInfo
;
const
char
*
sql
;
// query sql string
jmp_buf
env
;
// jump to this position when error happens.
EOPTR_EXEC_MODEL
execModel
;
// operator execution model [batch model|stream model]
SSubplan
*
pSubplan
;
struct
SOperatorInfo
*
pRoot
;
SLocalFetch
localFetch
;
SArray
*
pResultBlockList
;
// result block list
STaskStopInfo
stopInfo
;
SRWLatch
lock
;
// secure the access of STableListInfo
};
enum
{
OP_NOT_OPENED
=
0x0
,
OP_OPENED
=
0x1
,
OP_RES_TO_RETURN
=
0x5
,
OP_EXEC_DONE
=
0x9
,
};
typedef
struct
SOperatorFpSet
{
__optr_open_fn_t
_openFn
;
// DO NOT invoke this function directly
__optr_fn_t
getNextFn
;
__optr_fn_t
cleanupFn
;
// call this function to release the allocated resources ASAP
__optr_close_fn_t
closeFn
;
__optr_reqBuf_fn_t
reqBufFn
;
// total used buffer for blocking operator
__optr_encode_fn_t
encodeResultRow
;
__optr_decode_fn_t
decodeResultRow
;
__optr_explain_fn_t
getExplainFn
;
}
SOperatorFpSet
;
typedef
struct
SExprSupp
{
SExprInfo
*
pExprInfo
;
int32_t
numOfExprs
;
// the number of scalar expression in group operator
...
...
@@ -209,22 +131,6 @@ typedef struct SExprSupp {
SFilterInfo
*
pFilterInfo
;
}
SExprSupp
;
typedef
struct
SOperatorInfo
{
uint16_t
operatorType
;
int16_t
resultDataBlockId
;
bool
blocking
;
// block operator or not
uint8_t
status
;
// denote if current operator is completed
char
*
name
;
// name, for debug purpose
void
*
info
;
// extension attribution
SExprSupp
exprSupp
;
SExecTaskInfo
*
pTaskInfo
;
SOperatorCostInfo
cost
;
SResultInfo
resultInfo
;
struct
SOperatorInfo
**
pDownstream
;
// downstram pointer list
int32_t
numOfDownstream
;
// number of downstream. The value is always ONE expect for join operator
SOperatorFpSet
fpSet
;
}
SOperatorInfo
;
typedef
enum
{
EX_SOURCE_DATA_NOT_READY
=
0x1
,
EX_SOURCE_DATA_READY
=
0x2
,
...
...
@@ -449,8 +355,8 @@ typedef struct SStreamScanInfo {
SUpdateInfo
*
pUpdateInfo
;
EStreamScanMode
scanMode
;
SOperatorInfo
*
pStreamScanOp
;
SOperatorInfo
*
pTableScanOp
;
struct
SOperatorInfo
*
pStreamScanOp
;
struct
SOperatorInfo
*
pTableScanOp
;
SArray
*
childIds
;
SWindowSupporter
windowSup
;
SPartitionBySupporter
partitionSup
;
...
...
@@ -676,18 +582,8 @@ typedef struct SStreamFillOperatorInfo {
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
SExecTaskInfo
*
doCreateExecTaskInfo
(
uint64_t
queryId
,
uint64_t
taskId
,
int32_t
vgId
,
EOPTR_EXEC_MODEL
model
,
char
*
dbFName
);
SOperatorFpSet
createOperatorFpSet
(
__optr_open_fn_t
openFn
,
__optr_fn_t
nextFn
,
__optr_fn_t
cleanup
,
__optr_close_fn_t
closeFn
,
__optr_reqBuf_fn_t
reqBufFn
,
__optr_explain_fn_t
explain
);
int32_t
optrDummyOpenFn
(
SOperatorInfo
*
pOperator
);
int32_t
appendDownstream
(
SOperatorInfo
*
p
,
SOperatorInfo
**
pDownstream
,
int32_t
num
);
void
setOperatorCompleted
(
SOperatorInfo
*
pOperator
);
void
setOperatorInfo
(
SOperatorInfo
*
pOperator
,
const
char
*
name
,
int32_t
type
,
bool
blocking
,
int32_t
status
,
void
*
pInfo
,
SExecTaskInfo
*
pTaskInfo
);
void
destroyOperatorInfo
(
SOperatorInfo
*
pOperator
);
int32_t
optrDefaultBufFn
(
SOperatorInfo
*
pOperator
);
SSchemaWrapper
*
extractQueriedColumnSchema
(
SScanPhysiNode
*
pScanNode
);
int32_t
initQueriedTableSchemaInfo
(
SReadHandle
*
pHandle
,
SScanPhysiNode
*
pScanNode
,
const
char
*
dbName
,
SExecTaskInfo
*
pTaskInfo
);
void
initBasicInfo
(
SOptrBasicInfo
*
pInfo
,
SSDataBlock
*
pBlock
);
void
cleanupBasicInfo
(
SOptrBasicInfo
*
pInfo
);
...
...
@@ -703,9 +599,9 @@ void cleanupAggSup(SAggSupporter* pAggSup);
void
initResultSizeInfo
(
SResultInfo
*
pResultInfo
,
int32_t
numOfRows
);
void
doBuildStreamResBlock
(
SOperatorInfo
*
pOperator
,
SOptrBasicInfo
*
pbInfo
,
SGroupResInfo
*
pGroupResInfo
,
void
doBuildStreamResBlock
(
struct
SOperatorInfo
*
pOperator
,
SOptrBasicInfo
*
pbInfo
,
SGroupResInfo
*
pGroupResInfo
,
SDiskbasedBuf
*
pBuf
);
void
doBuildResultDatablock
(
SOperatorInfo
*
pOperator
,
SOptrBasicInfo
*
pbInfo
,
SGroupResInfo
*
pGroupResInfo
,
void
doBuildResultDatablock
(
struct
SOperatorInfo
*
pOperator
,
SOptrBasicInfo
*
pbInfo
,
SGroupResInfo
*
pGroupResInfo
,
SDiskbasedBuf
*
pBuf
);
bool
hasLimitOffsetInfo
(
SLimitInfo
*
pLimitInfo
);
...
...
@@ -719,12 +615,10 @@ void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pC
int32_t
extractDataBlockFromFetchRsp
(
SSDataBlock
*
pRes
,
char
*
pData
,
SArray
*
pColList
,
char
**
pNextStart
);
void
updateLoadRemoteInfo
(
SLoadRemoteDataInfo
*
pInfo
,
int64_t
numOfRows
,
int32_t
dataLen
,
int64_t
startTs
,
SOperatorInfo
*
pOperator
);
struct
SOperatorInfo
*
pOperator
);
STimeWindow
getFirstQualifiedTimeWindow
(
int64_t
ts
,
STimeWindow
*
pWindow
,
SInterval
*
pInterval
,
int32_t
order
);
SOperatorInfo
*
extractOperatorInTree
(
SOperatorInfo
*
pOperator
,
int32_t
type
,
const
char
*
id
);
int32_t
getTableScanInfo
(
SOperatorInfo
*
pOperator
,
int32_t
*
order
,
int32_t
*
scanFlag
,
bool
inheritUsOrder
);
int32_t
getBufferPgSize
(
int32_t
rowSize
,
uint32_t
*
defaultPgsz
,
uint32_t
*
defaultBufsz
);
extern
void
doDestroyExchangeOperatorInfo
(
void
*
param
);
...
...
@@ -742,76 +636,6 @@ void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput);
SResultRow
*
doSetResultOutBufByKey
(
SDiskbasedBuf
*
pResultBuf
,
SResultRowInfo
*
pResultRowInfo
,
char
*
pData
,
int16_t
bytes
,
bool
masterscan
,
uint64_t
groupId
,
SExecTaskInfo
*
pTaskInfo
,
bool
isIntervalQuery
,
SAggSupporter
*
pSup
,
bool
keepGroup
);
// operator creater functions
// clang-format off
SOperatorInfo
*
createExchangeOperatorInfo
(
void
*
pTransporter
,
SExchangePhysiNode
*
pExNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTableScanOperatorInfo
(
STableScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableList
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTableMergeScanOperatorInfo
(
STableScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
readHandle
,
STableListInfo
*
pTableListInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTagScanOperatorInfo
(
SReadHandle
*
pReadHandle
,
STagScanPhysiNode
*
pPhyNode
,
STableListInfo
*
pTableListInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSysTableScanOperatorInfo
(
void
*
readHandle
,
SSystemTableScanPhysiNode
*
pScanPhyNode
,
const
char
*
pUser
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTableCountScanOperatorInfo
(
SReadHandle
*
handle
,
STableCountScanPhysiNode
*
pNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createAggregateOperatorInfo
(
SOperatorInfo
*
downstream
,
SAggPhysiNode
*
pNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createIndefinitOutputOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createProjectOperatorInfo
(
SOperatorInfo
*
downstream
,
SProjectPhysiNode
*
pProjPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSortOperatorInfo
(
SOperatorInfo
*
downstream
,
SSortPhysiNode
*
pSortNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createMultiwayMergeOperatorInfo
(
SOperatorInfo
**
dowStreams
,
size_t
numStreams
,
SMergePhysiNode
*
pMergePhysiNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createCacherowsScanOperator
(
SLastRowScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
readHandle
,
STableListInfo
*
pTableListInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SIntervalPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createMergeIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SMergeIntervalPhysiNode
*
pIntervalPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createMergeAlignedIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SMergeAlignedIntervalPhysiNode
*
pNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamFinalIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
int32_t
numOfChild
);
SOperatorInfo
*
createSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SSessionWinodwPhysiNode
*
pSessionNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createGroupOperatorInfo
(
SOperatorInfo
*
downstream
,
SAggPhysiNode
*
pAggNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createDataBlockInfoScanOperator
(
SReadHandle
*
readHandle
,
SBlockDistScanPhysiNode
*
pBlockScanNode
,
STableListInfo
*
pTableListInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamScanOperatorInfo
(
SReadHandle
*
pHandle
,
STableScanPhysiNode
*
pTableScanNode
,
SNode
*
pTagCond
,
STableListInfo
*
pTableListInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createRawScanOperatorInfo
(
SReadHandle
*
pHandle
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createFillOperatorInfo
(
SOperatorInfo
*
downstream
,
SFillPhysiNode
*
pPhyFillNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStatewindowOperatorInfo
(
SOperatorInfo
*
downstream
,
SStateWinodwPhysiNode
*
pStateNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createPartitionOperatorInfo
(
SOperatorInfo
*
downstream
,
SPartitionPhysiNode
*
pPartNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamPartitionOperatorInfo
(
SOperatorInfo
*
downstream
,
SStreamPartitionPhysiNode
*
pPartNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTimeSliceOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createMergeJoinOperatorInfo
(
SOperatorInfo
**
pDownstream
,
int32_t
numOfDownstream
,
SSortMergeJoinPhysiNode
*
pJoinNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamFinalSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
int32_t
numOfChild
);
SOperatorInfo
*
createStreamIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamStateAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamFillOperatorInfo
(
SOperatorInfo
*
downstream
,
SStreamFillPhysiNode
*
pPhyFillNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createGroupSortOperatorInfo
(
SOperatorInfo
*
downstream
,
SGroupSortPhysiNode
*
pSortPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createEventwindowOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
physiNode
,
SExecTaskInfo
*
pTaskInfo
);
// clang-format on
int32_t
projectApplyFunctions
(
SExprInfo
*
pExpr
,
SSDataBlock
*
pResult
,
SSDataBlock
*
pSrcBlock
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
SArray
*
pPseudoList
);
...
...
@@ -819,21 +643,11 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
void
setInputDataBlock
(
SExprSupp
*
pExprSupp
,
SSDataBlock
*
pBlock
,
int32_t
order
,
int32_t
scanFlag
,
bool
createDummyCol
);
int32_t
checkForQueryBuf
(
size_t
numOfTables
);
int32_t
stopTableScanOperator
(
SOperatorInfo
*
pOperator
,
const
char
*
pIdStr
);
bool
isTaskKilled
(
SExecTaskInfo
*
pTaskInfo
);
void
setTaskKilled
(
SExecTaskInfo
*
pTaskInfo
,
int32_t
rspCode
);
void
doDestroyTask
(
SExecTaskInfo
*
pTaskInfo
);
void
setTaskStatus
(
SExecTaskInfo
*
pTaskInfo
,
int8_t
status
);
void
buildTaskId
(
uint64_t
taskId
,
uint64_t
queryId
,
char
*
dst
);
SArray
*
getTableListInfo
(
const
SExecTaskInfo
*
pTaskInfo
);
int32_t
createExecTaskInfo
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
taskId
,
int32_t
vgId
,
char
*
sql
,
EOPTR_EXEC_MODEL
model
);
int32_t
createDataSinkParam
(
SDataSinkNode
*
pNode
,
void
**
pParam
,
SExecTaskInfo
*
pTask
,
SReadHandle
*
readHandle
);
int32_t
getOperatorExplainExecInfo
(
SOperatorInfo
*
operatorInfo
,
SArray
*
pExecInfoList
);
int32_t
getOperatorExplainExecInfo
(
struct
SOperatorInfo
*
operatorInfo
,
SArray
*
pExecInfoList
);
STimeWindow
getActiveTimeWindow
(
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int64_t
ts
,
SInterval
*
pInterval
,
int32_t
order
);
...
...
@@ -855,17 +669,16 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
);
bool
groupbyTbname
(
SNodeList
*
pGroupList
);
int32_t
buildDataBlockFromGroupRes
(
SOperatorInfo
*
pOperator
,
SStreamState
*
pState
,
SSDataBlock
*
pBlock
,
SExprSupp
*
pSup
,
int32_t
buildDataBlockFromGroupRes
(
struct
SOperatorInfo
*
pOperator
,
SStreamState
*
pState
,
SSDataBlock
*
pBlock
,
SExprSupp
*
pSup
,
SGroupResInfo
*
pGroupResInfo
);
int32_t
saveSessionDiscBuf
(
SStreamState
*
pState
,
SSessionKey
*
key
,
void
*
buf
,
int32_t
size
);
int32_t
buildSessionResultDataBlock
(
SOperatorInfo
*
pOperator
,
SStreamState
*
pState
,
SSDataBlock
*
pBlock
,
int32_t
buildSessionResultDataBlock
(
struct
SOperatorInfo
*
pOperator
,
SStreamState
*
pState
,
SSDataBlock
*
pBlock
,
SExprSupp
*
pSup
,
SGroupResInfo
*
pGroupResInfo
);
int32_t
setOutputBuf
(
SStreamState
*
pState
,
STimeWindow
*
win
,
SResultRow
**
pResult
,
int64_t
tableGroupId
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
,
int32_t
*
rowEntryInfoOffset
,
SAggSupporter
*
pAggSup
);
int32_t
releaseOutputBuf
(
SStreamState
*
pState
,
SWinKey
*
pKey
,
SResultRow
*
pResult
);
int32_t
saveOutputBuf
(
SStreamState
*
pState
,
SWinKey
*
pKey
,
SResultRow
*
pResult
,
int32_t
resSize
);
void
getNextIntervalWindow
(
SInterval
*
pInterval
,
STimeWindow
*
tw
,
int32_t
order
);
int32_t
qAppendTaskStopInfo
(
SExecTaskInfo
*
pTaskInfo
,
SExchangeOpStopInfo
*
pInfo
);
int32_t
getForwardStepsInBlock
(
int32_t
numOfRows
,
__block_search_fn_t
searchFn
,
TSKEY
ekey
,
int32_t
pos
,
int32_t
order
,
int64_t
*
pData
);
void
appendCreateTableRow
(
SStreamState
*
pState
,
SExprSupp
*
pTableSup
,
SExprSupp
*
pTagSup
,
uint64_t
groupId
,
...
...
source/libs/executor/inc/operator.h
0 → 100644
浏览文件 @
7adc2102
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_OPERATOR_H
#define TDENGINE_OPERATOR_H
#ifdef __cplusplus
extern
"C"
{
#endif
typedef
struct
SOperatorCostInfo
{
double
openCost
;
double
totalCost
;
}
SOperatorCostInfo
;
struct
SOperatorInfo
;
typedef
int32_t
(
*
__optr_encode_fn_t
)(
struct
SOperatorInfo
*
pOperator
,
char
**
result
,
int32_t
*
length
);
typedef
int32_t
(
*
__optr_decode_fn_t
)(
struct
SOperatorInfo
*
pOperator
,
char
*
result
);
typedef
int32_t
(
*
__optr_open_fn_t
)(
struct
SOperatorInfo
*
pOptr
);
typedef
SSDataBlock
*
(
*
__optr_fn_t
)(
struct
SOperatorInfo
*
pOptr
);
typedef
void
(
*
__optr_close_fn_t
)(
void
*
param
);
typedef
int32_t
(
*
__optr_explain_fn_t
)(
struct
SOperatorInfo
*
pOptr
,
void
**
pOptrExplain
,
uint32_t
*
len
);
typedef
int32_t
(
*
__optr_reqBuf_fn_t
)(
struct
SOperatorInfo
*
pOptr
);
typedef
struct
SOperatorFpSet
{
__optr_open_fn_t
_openFn
;
// DO NOT invoke this function directly
__optr_fn_t
getNextFn
;
__optr_fn_t
cleanupFn
;
// call this function to release the allocated resources ASAP
__optr_close_fn_t
closeFn
;
__optr_reqBuf_fn_t
reqBufFn
;
// total used buffer for blocking operator
__optr_encode_fn_t
encodeResultRow
;
__optr_decode_fn_t
decodeResultRow
;
__optr_explain_fn_t
getExplainFn
;
}
SOperatorFpSet
;
enum
{
OP_NOT_OPENED
=
0x0
,
OP_OPENED
=
0x1
,
OP_RES_TO_RETURN
=
0x5
,
OP_EXEC_DONE
=
0x9
,
};
typedef
struct
SOperatorInfo
{
uint16_t
operatorType
;
int16_t
resultDataBlockId
;
bool
blocking
;
// block operator or not
uint8_t
status
;
// denote if current operator is completed
char
*
name
;
// name, for debug purpose
void
*
info
;
// extension attribution
SExprSupp
exprSupp
;
SExecTaskInfo
*
pTaskInfo
;
SOperatorCostInfo
cost
;
SResultInfo
resultInfo
;
struct
SOperatorInfo
**
pDownstream
;
// downstram pointer list
int32_t
numOfDownstream
;
// number of downstream. The value is always ONE expect for join operator
SOperatorFpSet
fpSet
;
}
SOperatorInfo
;
// operator creater functions
// clang-format off
SOperatorInfo
*
createExchangeOperatorInfo
(
void
*
pTransporter
,
SExchangePhysiNode
*
pExNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTableScanOperatorInfo
(
STableScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
pHandle
,
STableListInfo
*
pTableList
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTableMergeScanOperatorInfo
(
STableScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
readHandle
,
STableListInfo
*
pTableListInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTagScanOperatorInfo
(
SReadHandle
*
pReadHandle
,
STagScanPhysiNode
*
pPhyNode
,
STableListInfo
*
pTableListInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSysTableScanOperatorInfo
(
void
*
readHandle
,
SSystemTableScanPhysiNode
*
pScanPhyNode
,
const
char
*
pUser
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTableCountScanOperatorInfo
(
SReadHandle
*
handle
,
STableCountScanPhysiNode
*
pNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createAggregateOperatorInfo
(
SOperatorInfo
*
downstream
,
SAggPhysiNode
*
pNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createIndefinitOutputOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createProjectOperatorInfo
(
SOperatorInfo
*
downstream
,
SProjectPhysiNode
*
pProjPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createSortOperatorInfo
(
SOperatorInfo
*
downstream
,
SSortPhysiNode
*
pSortNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createMultiwayMergeOperatorInfo
(
SOperatorInfo
**
dowStreams
,
size_t
numStreams
,
SMergePhysiNode
*
pMergePhysiNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createCacherowsScanOperator
(
SLastRowScanPhysiNode
*
pTableScanNode
,
SReadHandle
*
readHandle
,
STableListInfo
*
pTableListInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SIntervalPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createMergeIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SMergeIntervalPhysiNode
*
pIntervalPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createMergeAlignedIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SMergeAlignedIntervalPhysiNode
*
pNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamFinalIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
int32_t
numOfChild
);
SOperatorInfo
*
createSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SSessionWinodwPhysiNode
*
pSessionNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createGroupOperatorInfo
(
SOperatorInfo
*
downstream
,
SAggPhysiNode
*
pAggNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createDataBlockInfoScanOperator
(
SReadHandle
*
readHandle
,
SBlockDistScanPhysiNode
*
pBlockScanNode
,
STableListInfo
*
pTableListInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamScanOperatorInfo
(
SReadHandle
*
pHandle
,
STableScanPhysiNode
*
pTableScanNode
,
SNode
*
pTagCond
,
STableListInfo
*
pTableListInfo
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createRawScanOperatorInfo
(
SReadHandle
*
pHandle
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createFillOperatorInfo
(
SOperatorInfo
*
downstream
,
SFillPhysiNode
*
pPhyFillNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStatewindowOperatorInfo
(
SOperatorInfo
*
downstream
,
SStateWinodwPhysiNode
*
pStateNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createPartitionOperatorInfo
(
SOperatorInfo
*
downstream
,
SPartitionPhysiNode
*
pPartNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamPartitionOperatorInfo
(
SOperatorInfo
*
downstream
,
SStreamPartitionPhysiNode
*
pPartNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createTimeSliceOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createMergeJoinOperatorInfo
(
SOperatorInfo
**
pDownstream
,
int32_t
numOfDownstream
,
SSortMergeJoinPhysiNode
*
pJoinNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamFinalSessionAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
int32_t
numOfChild
);
SOperatorInfo
*
createStreamIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamStateAggOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createStreamFillOperatorInfo
(
SOperatorInfo
*
downstream
,
SStreamFillPhysiNode
*
pPhyFillNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createGroupSortOperatorInfo
(
SOperatorInfo
*
downstream
,
SGroupSortPhysiNode
*
pSortPhyNode
,
SExecTaskInfo
*
pTaskInfo
);
SOperatorInfo
*
createEventwindowOperatorInfo
(
SOperatorInfo
*
downstream
,
SPhysiNode
*
physiNode
,
SExecTaskInfo
*
pTaskInfo
);
// clang-format on
SOperatorFpSet
createOperatorFpSet
(
__optr_open_fn_t
openFn
,
__optr_fn_t
nextFn
,
__optr_fn_t
cleanup
,
__optr_close_fn_t
closeFn
,
__optr_reqBuf_fn_t
reqBufFn
,
__optr_explain_fn_t
explain
);
int32_t
optrDummyOpenFn
(
SOperatorInfo
*
pOperator
);
int32_t
appendDownstream
(
SOperatorInfo
*
p
,
SOperatorInfo
**
pDownstream
,
int32_t
num
);
void
setOperatorCompleted
(
SOperatorInfo
*
pOperator
);
void
setOperatorInfo
(
SOperatorInfo
*
pOperator
,
const
char
*
name
,
int32_t
type
,
bool
blocking
,
int32_t
status
,
void
*
pInfo
,
SExecTaskInfo
*
pTaskInfo
);
int32_t
optrDefaultBufFn
(
SOperatorInfo
*
pOperator
);
SOperatorInfo
*
createOperator
(
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
SReadHandle
*
pHandle
,
SNode
*
pTagCond
,
SNode
*
pTagIndexCond
,
const
char
*
pUser
,
const
char
*
dbname
);
void
destroyOperator
(
SOperatorInfo
*
pOperator
);
SOperatorInfo
*
extractOperatorInTree
(
SOperatorInfo
*
pOperator
,
int32_t
type
,
const
char
*
id
);
int32_t
getTableScanInfo
(
SOperatorInfo
*
pOperator
,
int32_t
*
order
,
int32_t
*
scanFlag
,
bool
inheritUsOrder
);
int32_t
stopTableScanOperator
(
SOperatorInfo
*
pOperator
,
const
char
*
pIdStr
);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_OPERATOR_H
\ No newline at end of file
source/libs/executor/inc/querytask.h
0 → 100644
浏览文件 @
7adc2102
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_QUERYTASK_H
#define TDENGINE_QUERYTASK_H
#ifdef __cplusplus
extern
"C"
{
#endif
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str)
typedef
struct
STaskIdInfo
{
uint64_t
queryId
;
// this is also a request id
uint64_t
subplanId
;
uint64_t
templateId
;
char
*
str
;
int32_t
vgId
;
}
STaskIdInfo
;
typedef
struct
STaskCostInfo
{
int64_t
created
;
int64_t
start
;
uint64_t
elapsedTime
;
double
extractListTime
;
double
groupIdMapTime
;
SFileBlockLoadRecorder
*
pRecoder
;
}
STaskCostInfo
;
typedef
struct
STaskStopInfo
{
SRWLatch
lock
;
SArray
*
pStopInfo
;
}
STaskStopInfo
;
struct
SExecTaskInfo
{
STaskIdInfo
id
;
uint32_t
status
;
STimeWindow
window
;
STaskCostInfo
cost
;
int64_t
owner
;
// if it is in execution
int32_t
code
;
int32_t
qbufQuota
;
// total available buffer (in KB) during execution query
int64_t
version
;
// used for stream to record wal version, why not move to sschemainfo
SStreamTaskInfo
streamInfo
;
SSchemaInfo
schemaInfo
;
const
char
*
sql
;
// query sql string
jmp_buf
env
;
// jump to this position when error happens.
EOPTR_EXEC_MODEL
execModel
;
// operator execution model [batch model|stream model]
SSubplan
*
pSubplan
;
struct
SOperatorInfo
*
pRoot
;
SLocalFetch
localFetch
;
SArray
*
pResultBlockList
;
// result block list
STaskStopInfo
stopInfo
;
SRWLatch
lock
;
// secure the access of STableListInfo
};
void
buildTaskId
(
uint64_t
taskId
,
uint64_t
queryId
,
char
*
dst
);
SExecTaskInfo
*
doCreateTask
(
uint64_t
queryId
,
uint64_t
taskId
,
int32_t
vgId
,
EOPTR_EXEC_MODEL
model
,
char
*
dbFName
);
void
doDestroyTask
(
SExecTaskInfo
*
pTaskInfo
);
bool
isTaskKilled
(
SExecTaskInfo
*
pTaskInfo
);
void
setTaskKilled
(
SExecTaskInfo
*
pTaskInfo
,
int32_t
rspCode
);
void
setTaskStatus
(
SExecTaskInfo
*
pTaskInfo
,
int8_t
status
);
int32_t
createExecTaskInfo
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
taskId
,
int32_t
vgId
,
char
*
sql
,
EOPTR_EXEC_MODEL
model
);
int32_t
qAppendTaskStopInfo
(
SExecTaskInfo
*
pTaskInfo
,
SExchangeOpStopInfo
*
pInfo
);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_QUERYTASK_H
source/libs/executor/src/aggregateoperator.c
浏览文件 @
7adc2102
...
...
@@ -15,7 +15,6 @@
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
#include "os.h"
#include "querynodes.h"
#include "tfill.h"
...
...
@@ -23,16 +22,14 @@
#include "tdatablock.h"
#include "tglobal.h"
#include "tmsg.h"
#include "ttime.h"
#include "executorimpl.h"
#include "index.h"
#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
#include "vnode.h"
#include "operator.h"
#include "querytask.h"
typedef
struct
{
bool
hasAgg
;
...
...
source/libs/executor/src/cachescanoperator.c
浏览文件 @
7adc2102
...
...
@@ -24,6 +24,8 @@
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
#include "operator.h"
#include "querytask.h"
typedef
struct
SCacheRowsScanInfo
{
SSDataBlock
*
pRes
;
...
...
source/libs/executor/src/eventwindowoperator.c
浏览文件 @
7adc2102
...
...
@@ -21,6 +21,8 @@
#include "tcompare.h"
#include "tdatablock.h"
#include "ttime.h"
#include "operator.h"
#include "querytask.h"
typedef
struct
SEventWindowOperatorInfo
{
SOptrBasicInfo
binfo
;
...
...
source/libs/executor/src/exchangeoperator.c
浏览文件 @
7adc2102
...
...
@@ -24,6 +24,8 @@
#include "index.h"
#include "query.h"
#include "thash.h"
#include "operator.h"
#include "querytask.h"
typedef
struct
SFetchRspHandleWrapper
{
uint32_t
exchangeId
;
...
...
source/libs/executor/src/executil.c
浏览文件 @
7adc2102
...
...
@@ -26,6 +26,7 @@
#include "executil.h"
#include "executorimpl.h"
#include "tcompression.h"
#include "querytask.h"
typedef
struct
STableListIdInfo
{
uint64_t
suid
;
...
...
source/libs/executor/src/executor.c
浏览文件 @
7adc2102
...
...
@@ -14,13 +14,14 @@
*/
#include "executor.h"
#include <vnode.h>
#include "executorimpl.h"
#include "planner.h"
#include "tdatablock.h"
#include "tref.h"
#include "tudf.h"
#include "vnode.h"
#include "operator.h"
#include "querytask.h"
static
TdThreadOnce
initPoolOnce
=
PTHREAD_ONCE_INIT
;
int32_t
exchangeObjRefPool
=
-
1
;
...
...
@@ -249,7 +250,7 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks,
qTaskInfo_t
qCreateQueueExecTaskInfo
(
void
*
msg
,
SReadHandle
*
pReaderHandle
,
int32_t
vgId
,
int32_t
*
numOfCols
,
uint64_t
id
)
{
if
(
msg
==
NULL
)
{
// create raw scan
SExecTaskInfo
*
pTaskInfo
=
doCreate
ExecTaskInfo
(
0
,
id
,
vgId
,
OPTR_EXEC_MODEL_QUEUE
,
""
);
SExecTaskInfo
*
pTaskInfo
=
doCreate
Task
(
0
,
id
,
vgId
,
OPTR_EXEC_MODEL_QUEUE
,
""
);
if
(
NULL
==
pTaskInfo
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
7adc2102
此差异已折叠。
点击以展开。
source/libs/executor/src/filloperator.c
浏览文件 @
7adc2102
...
...
@@ -25,11 +25,13 @@
#include "thash.h"
#include "ttime.h"
#include "executorInt.h"
#include "function.h"
#include "querynodes.h"
#include "tdatablock.h"
#include "tfill.h"
#include "operator.h"
#include "querytask.h"
#define FILL_POS_INVALID 0
#define FILL_POS_START 1
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
7adc2102
...
...
@@ -26,6 +26,8 @@
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
#include "operator.h"
#include "querytask.h"
typedef
struct
SGroupbyOperatorInfo
{
SOptrBasicInfo
binfo
;
...
...
source/libs/executor/src/joinoperator.c
浏览文件 @
7adc2102
...
...
@@ -23,6 +23,8 @@
#include "thash.h"
#include "tmsg.h"
#include "ttypes.h"
#include "operator.h"
#include "querytask.h"
typedef
struct
SJoinRowCtx
{
bool
rowRemains
;
...
...
source/libs/executor/src/operator.c
浏览文件 @
7adc2102
...
...
@@ -15,24 +15,17 @@
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
#include "os.h"
#include "querynodes.h"
#include "tfill.h"
#include "tname.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "tmsg.h"
#include "ttime.h"
#include "executorimpl.h"
#include "index.h"
#include "query.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
#include "vnode.h"
#include "operator.h"
#include "querytask.h"
SOperatorFpSet
createOperatorFpSet
(
__optr_open_fn_t
openFn
,
__optr_fn_t
nextFn
,
__optr_fn_t
cleanup
,
__optr_close_fn_t
closeFn
,
__optr_reqBuf_fn_t
reqBufFn
,
...
...
@@ -82,7 +75,7 @@ void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, b
pOperator
->
pTaskInfo
=
pTaskInfo
;
}
void
destroyOperator
Info
(
SOperatorInfo
*
pOperator
)
{
void
destroyOperator
(
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
==
NULL
)
{
return
;
}
...
...
@@ -93,7 +86,7 @@ void destroyOperatorInfo(SOperatorInfo* pOperator) {
if
(
pOperator
->
pDownstream
!=
NULL
)
{
for
(
int32_t
i
=
0
;
i
<
pOperator
->
numOfDownstream
;
++
i
)
{
destroyOperator
Info
(
pOperator
->
pDownstream
[
i
]);
destroyOperator
(
pOperator
->
pDownstream
[
i
]);
}
taosMemoryFreeClear
(
pOperator
->
pDownstream
);
...
...
@@ -287,7 +280,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
SExtScanInfo
info
=
{.
inheritUsOrder
=
inheritUsOrder
};
STraverParam
p
=
{.
pParam
=
&
info
};
extractScanInfo
(
pOperator
,
&
p
,
NULL
);
traverseOperatorTree
(
pOperator
,
extractScanInfo
,
&
p
,
NULL
);
*
order
=
info
.
order
;
*
scanFlag
=
info
.
scanFlag
;
...
...
@@ -304,10 +297,12 @@ static ERetType doStopDataReader(SOperatorInfo* pOperator, STraverParam* pParam,
return
OPTR_FN_RET_ABORT
;
}
else
if
(
pOperator
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
SStreamScanInfo
*
pInfo
=
pOperator
->
info
;
STableScanInfo
*
pTableScanInfo
=
pInfo
->
pTableScanOp
->
info
;
if
(
pTableScanInfo
->
base
.
dataReader
!=
NULL
)
{
tsdbReaderSetCloseFlag
(
pTableScanInfo
->
base
.
dataReader
);
if
(
pInfo
->
pTableScanOp
!=
NULL
)
{
STableScanInfo
*
pTableScanInfo
=
pInfo
->
pTableScanOp
->
info
;
if
(
pTableScanInfo
!=
NULL
&&
pTableScanInfo
->
base
.
dataReader
!=
NULL
)
{
tsdbReaderSetCloseFlag
(
pTableScanInfo
->
base
.
dataReader
);
}
}
return
OPTR_FN_RET_ABORT
;
...
...
@@ -321,3 +316,264 @@ int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdStr) {
traverseOperatorTree
(
pOperator
,
doStopDataReader
,
&
p
,
pIdStr
);
return
p
.
code
;
}
SOperatorInfo
*
createOperator
(
SPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
SReadHandle
*
pHandle
,
SNode
*
pTagCond
,
SNode
*
pTagIndexCond
,
const
char
*
pUser
,
const
char
*
dbname
)
{
int32_t
type
=
nodeType
(
pPhyNode
);
const
char
*
idstr
=
GET_TASKID
(
pTaskInfo
);
if
(
pPhyNode
->
pChildren
==
NULL
||
LIST_LENGTH
(
pPhyNode
->
pChildren
)
==
0
)
{
SOperatorInfo
*
pOperator
=
NULL
;
if
(
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
==
type
)
{
STableScanPhysiNode
*
pTableScanNode
=
(
STableScanPhysiNode
*
)
pPhyNode
;
// NOTE: this is an patch to fix the physical plan
// TODO remove it later
if
(
pTableScanNode
->
scan
.
node
.
pLimit
!=
NULL
)
{
pTableScanNode
->
groupSort
=
true
;
}
STableListInfo
*
pTableListInfo
=
tableListCreate
();
int32_t
code
=
createScanTableListInfo
(
&
pTableScanNode
->
scan
,
pTableScanNode
->
pGroupTags
,
pTableScanNode
->
groupSort
,
pHandle
,
pTableListInfo
,
pTagCond
,
pTagIndexCond
,
pTaskInfo
);
if
(
code
)
{
pTaskInfo
->
code
=
code
;
tableListDestroy
(
pTableListInfo
);
qError
(
"failed to createScanTableListInfo, code:%s, %s"
,
tstrerror
(
code
),
idstr
);
return
NULL
;
}
code
=
initQueriedTableSchemaInfo
(
pHandle
,
&
pTableScanNode
->
scan
,
dbname
,
pTaskInfo
);
if
(
code
)
{
pTaskInfo
->
code
=
terrno
;
tableListDestroy
(
pTableListInfo
);
return
NULL
;
}
pOperator
=
createTableScanOperatorInfo
(
pTableScanNode
,
pHandle
,
pTableListInfo
,
pTaskInfo
);
if
(
NULL
==
pOperator
)
{
pTaskInfo
->
code
=
terrno
;
return
NULL
;
}
STableScanInfo
*
pScanInfo
=
pOperator
->
info
;
pTaskInfo
->
cost
.
pRecoder
=
&
pScanInfo
->
base
.
readRecorder
;
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN
==
type
)
{
STableMergeScanPhysiNode
*
pTableScanNode
=
(
STableMergeScanPhysiNode
*
)
pPhyNode
;
STableListInfo
*
pTableListInfo
=
tableListCreate
();
int32_t
code
=
createScanTableListInfo
(
&
pTableScanNode
->
scan
,
pTableScanNode
->
pGroupTags
,
true
,
pHandle
,
pTableListInfo
,
pTagCond
,
pTagIndexCond
,
pTaskInfo
);
if
(
code
)
{
pTaskInfo
->
code
=
code
;
tableListDestroy
(
pTableListInfo
);
qError
(
"failed to createScanTableListInfo, code: %s"
,
tstrerror
(
code
));
return
NULL
;
}
code
=
initQueriedTableSchemaInfo
(
pHandle
,
&
pTableScanNode
->
scan
,
dbname
,
pTaskInfo
);
if
(
code
)
{
pTaskInfo
->
code
=
terrno
;
tableListDestroy
(
pTableListInfo
);
return
NULL
;
}
pOperator
=
createTableMergeScanOperatorInfo
(
pTableScanNode
,
pHandle
,
pTableListInfo
,
pTaskInfo
);
if
(
NULL
==
pOperator
)
{
pTaskInfo
->
code
=
terrno
;
tableListDestroy
(
pTableListInfo
);
return
NULL
;
}
STableScanInfo
*
pScanInfo
=
pOperator
->
info
;
pTaskInfo
->
cost
.
pRecoder
=
&
pScanInfo
->
base
.
readRecorder
;
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
==
type
)
{
pOperator
=
createExchangeOperatorInfo
(
pHandle
?
pHandle
->
pMsgCb
->
clientRpc
:
NULL
,
(
SExchangePhysiNode
*
)
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
==
type
)
{
STableScanPhysiNode
*
pTableScanNode
=
(
STableScanPhysiNode
*
)
pPhyNode
;
STableListInfo
*
pTableListInfo
=
tableListCreate
();
if
(
pHandle
->
vnode
)
{
int32_t
code
=
createScanTableListInfo
(
&
pTableScanNode
->
scan
,
pTableScanNode
->
pGroupTags
,
pTableScanNode
->
groupSort
,
pHandle
,
pTableListInfo
,
pTagCond
,
pTagIndexCond
,
pTaskInfo
);
if
(
code
)
{
pTaskInfo
->
code
=
code
;
tableListDestroy
(
pTableListInfo
);
qError
(
"failed to createScanTableListInfo, code: %s"
,
tstrerror
(
code
));
return
NULL
;
}
}
pTaskInfo
->
schemaInfo
.
qsw
=
extractQueriedColumnSchema
(
&
pTableScanNode
->
scan
);
pOperator
=
createStreamScanOperatorInfo
(
pHandle
,
pTableScanNode
,
pTagCond
,
pTableListInfo
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
==
type
)
{
SSystemTableScanPhysiNode
*
pSysScanPhyNode
=
(
SSystemTableScanPhysiNode
*
)
pPhyNode
;
pOperator
=
createSysTableScanOperatorInfo
(
pHandle
,
pSysScanPhyNode
,
pUser
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN
==
type
)
{
STableCountScanPhysiNode
*
pTblCountScanNode
=
(
STableCountScanPhysiNode
*
)
pPhyNode
;
pOperator
=
createTableCountScanOperatorInfo
(
pHandle
,
pTblCountScanNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN
==
type
)
{
STagScanPhysiNode
*
pScanPhyNode
=
(
STagScanPhysiNode
*
)
pPhyNode
;
STableListInfo
*
pTableListInfo
=
tableListCreate
();
int32_t
code
=
createScanTableListInfo
(
pScanPhyNode
,
NULL
,
false
,
pHandle
,
pTableListInfo
,
pTagCond
,
pTagIndexCond
,
pTaskInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pTaskInfo
->
code
=
code
;
qError
(
"failed to getTableList, code: %s"
,
tstrerror
(
code
));
return
NULL
;
}
pOperator
=
createTagScanOperatorInfo
(
pHandle
,
pScanPhyNode
,
pTableListInfo
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN
==
type
)
{
SBlockDistScanPhysiNode
*
pBlockNode
=
(
SBlockDistScanPhysiNode
*
)
pPhyNode
;
STableListInfo
*
pTableListInfo
=
tableListCreate
();
if
(
pBlockNode
->
tableType
==
TSDB_SUPER_TABLE
)
{
SArray
*
pList
=
taosArrayInit
(
4
,
sizeof
(
STableKeyInfo
));
int32_t
code
=
vnodeGetAllTableList
(
pHandle
->
vnode
,
pBlockNode
->
uid
,
pList
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pTaskInfo
->
code
=
terrno
;
return
NULL
;
}
size_t
num
=
taosArrayGetSize
(
pList
);
for
(
int32_t
i
=
0
;
i
<
num
;
++
i
)
{
STableKeyInfo
*
p
=
taosArrayGet
(
pList
,
i
);
tableListAddTableInfo
(
pTableListInfo
,
p
->
uid
,
0
);
}
taosArrayDestroy
(
pList
);
}
else
{
// Create group with only one table
tableListAddTableInfo
(
pTableListInfo
,
pBlockNode
->
uid
,
0
);
}
pOperator
=
createDataBlockInfoScanOperator
(
pHandle
,
pBlockNode
,
pTableListInfo
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN
==
type
)
{
SLastRowScanPhysiNode
*
pScanNode
=
(
SLastRowScanPhysiNode
*
)
pPhyNode
;
STableListInfo
*
pTableListInfo
=
tableListCreate
();
int32_t
code
=
createScanTableListInfo
(
&
pScanNode
->
scan
,
pScanNode
->
pGroupTags
,
true
,
pHandle
,
pTableListInfo
,
pTagCond
,
pTagIndexCond
,
pTaskInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pTaskInfo
->
code
=
code
;
return
NULL
;
}
code
=
initQueriedTableSchemaInfo
(
pHandle
,
&
pScanNode
->
scan
,
dbname
,
pTaskInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pTaskInfo
->
code
=
code
;
return
NULL
;
}
pOperator
=
createCacherowsScanOperator
(
pScanNode
,
pHandle
,
pTableListInfo
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_PROJECT
==
type
)
{
pOperator
=
createProjectOperatorInfo
(
NULL
,
(
SProjectPhysiNode
*
)
pPhyNode
,
pTaskInfo
);
}
else
{
terrno
=
TSDB_CODE_INVALID_PARA
;
return
NULL
;
}
if
(
pOperator
!=
NULL
)
{
// todo moved away
pOperator
->
resultDataBlockId
=
pPhyNode
->
pOutputDataBlockDesc
->
dataBlockId
;
}
return
pOperator
;
}
size_t
size
=
LIST_LENGTH
(
pPhyNode
->
pChildren
);
SOperatorInfo
**
ops
=
taosMemoryCalloc
(
size
,
POINTER_BYTES
);
if
(
ops
==
NULL
)
{
return
NULL
;
}
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SPhysiNode
*
pChildNode
=
(
SPhysiNode
*
)
nodesListGetNode
(
pPhyNode
->
pChildren
,
i
);
ops
[
i
]
=
createOperator
(
pChildNode
,
pTaskInfo
,
pHandle
,
pTagCond
,
pTagIndexCond
,
pUser
,
dbname
);
if
(
ops
[
i
]
==
NULL
)
{
taosMemoryFree
(
ops
);
return
NULL
;
}
}
SOperatorInfo
*
pOptr
=
NULL
;
if
(
QUERY_NODE_PHYSICAL_PLAN_PROJECT
==
type
)
{
pOptr
=
createProjectOperatorInfo
(
ops
[
0
],
(
SProjectPhysiNode
*
)
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_HASH_AGG
==
type
)
{
SAggPhysiNode
*
pAggNode
=
(
SAggPhysiNode
*
)
pPhyNode
;
if
(
pAggNode
->
pGroupKeys
!=
NULL
)
{
pOptr
=
createGroupOperatorInfo
(
ops
[
0
],
pAggNode
,
pTaskInfo
);
}
else
{
pOptr
=
createAggregateOperatorInfo
(
ops
[
0
],
pAggNode
,
pTaskInfo
);
}
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL
==
type
)
{
SIntervalPhysiNode
*
pIntervalPhyNode
=
(
SIntervalPhysiNode
*
)
pPhyNode
;
pOptr
=
createIntervalOperatorInfo
(
ops
[
0
],
pIntervalPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
==
type
)
{
pOptr
=
createStreamIntervalOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL
==
type
)
{
SMergeAlignedIntervalPhysiNode
*
pIntervalPhyNode
=
(
SMergeAlignedIntervalPhysiNode
*
)
pPhyNode
;
pOptr
=
createMergeAlignedIntervalOperatorInfo
(
ops
[
0
],
pIntervalPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL
==
type
)
{
SMergeIntervalPhysiNode
*
pIntervalPhyNode
=
(
SMergeIntervalPhysiNode
*
)
pPhyNode
;
pOptr
=
createMergeIntervalOperatorInfo
(
ops
[
0
],
pIntervalPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL
==
type
)
{
int32_t
children
=
0
;
pOptr
=
createStreamFinalIntervalOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
,
children
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL
==
type
)
{
int32_t
children
=
pHandle
->
numOfVgroups
;
pOptr
=
createStreamFinalIntervalOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
,
children
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_SORT
==
type
)
{
pOptr
=
createSortOperatorInfo
(
ops
[
0
],
(
SSortPhysiNode
*
)
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT
==
type
)
{
pOptr
=
createGroupSortOperatorInfo
(
ops
[
0
],
(
SGroupSortPhysiNode
*
)
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_MERGE
==
type
)
{
SMergePhysiNode
*
pMergePhyNode
=
(
SMergePhysiNode
*
)
pPhyNode
;
pOptr
=
createMultiwayMergeOperatorInfo
(
ops
,
size
,
pMergePhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION
==
type
)
{
SSessionWinodwPhysiNode
*
pSessionNode
=
(
SSessionWinodwPhysiNode
*
)
pPhyNode
;
pOptr
=
createSessionAggOperatorInfo
(
ops
[
0
],
pSessionNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION
==
type
)
{
pOptr
=
createStreamSessionAggOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION
==
type
)
{
int32_t
children
=
0
;
pOptr
=
createStreamFinalSessionAggOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
,
children
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION
==
type
)
{
int32_t
children
=
pHandle
->
numOfVgroups
;
pOptr
=
createStreamFinalSessionAggOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
,
children
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_PARTITION
==
type
)
{
pOptr
=
createPartitionOperatorInfo
(
ops
[
0
],
(
SPartitionPhysiNode
*
)
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION
==
type
)
{
pOptr
=
createStreamPartitionOperatorInfo
(
ops
[
0
],
(
SStreamPartitionPhysiNode
*
)
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE
==
type
)
{
SStateWinodwPhysiNode
*
pStateNode
=
(
SStateWinodwPhysiNode
*
)
pPhyNode
;
pOptr
=
createStatewindowOperatorInfo
(
ops
[
0
],
pStateNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE
==
type
)
{
pOptr
=
createStreamStateAggOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN
==
type
)
{
pOptr
=
createMergeJoinOperatorInfo
(
ops
,
size
,
(
SSortMergeJoinPhysiNode
*
)
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_FILL
==
type
)
{
pOptr
=
createFillOperatorInfo
(
ops
[
0
],
(
SFillPhysiNode
*
)
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL
==
type
)
{
pOptr
=
createStreamFillOperatorInfo
(
ops
[
0
],
(
SStreamFillPhysiNode
*
)
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC
==
type
)
{
pOptr
=
createIndefinitOutputOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC
==
type
)
{
pOptr
=
createTimeSliceOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT
==
type
)
{
pOptr
=
createEventwindowOperatorInfo
(
ops
[
0
],
pPhyNode
,
pTaskInfo
);
}
else
{
terrno
=
TSDB_CODE_INVALID_PARA
;
taosMemoryFree
(
ops
);
return
NULL
;
}
taosMemoryFree
(
ops
);
if
(
pOptr
)
{
pOptr
->
resultDataBlockId
=
pPhyNode
->
pOutputDataBlockDesc
->
dataBlockId
;
}
return
pOptr
;
}
source/libs/executor/src/projectoperator.c
浏览文件 @
7adc2102
...
...
@@ -16,6 +16,8 @@
#include "executorimpl.h"
#include "filter.h"
#include "functionMgt.h"
#include "operator.h"
#include "querytask.h"
typedef
struct
SProjectOperatorInfo
{
SOptrBasicInfo
binfo
;
...
...
source/libs/executor/src/querytask.c
0 → 100644
浏览文件 @
7adc2102
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
#include "os.h"
#include "querynodes.h"
#include "tfill.h"
#include "tname.h"
#include "tdatablock.h"
#include "tmsg.h"
#include "executorimpl.h"
#include "index.h"
#include "query.h"
#include "thash.h"
#include "ttypes.h"
#include "vnode.h"
#include "operator.h"
#include "querytask.h"
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
SExecTaskInfo
*
doCreateTask
(
uint64_t
queryId
,
uint64_t
taskId
,
int32_t
vgId
,
EOPTR_EXEC_MODEL
model
,
char
*
dbFName
)
{
SExecTaskInfo
*
pTaskInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SExecTaskInfo
));
if
(
pTaskInfo
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
setTaskStatus
(
pTaskInfo
,
TASK_NOT_COMPLETED
);
pTaskInfo
->
cost
.
created
=
taosGetTimestampUs
();
pTaskInfo
->
schemaInfo
.
dbname
=
taosStrdup
(
dbFName
);
pTaskInfo
->
execModel
=
model
;
pTaskInfo
->
stopInfo
.
pStopInfo
=
taosArrayInit
(
4
,
sizeof
(
SExchangeOpStopInfo
));
pTaskInfo
->
pResultBlockList
=
taosArrayInit
(
128
,
POINTER_BYTES
);
taosInitRWLatch
(
&
pTaskInfo
->
lock
);
pTaskInfo
->
id
.
vgId
=
vgId
;
pTaskInfo
->
id
.
queryId
=
queryId
;
pTaskInfo
->
id
.
str
=
taosMemoryMalloc
(
64
);
buildTaskId
(
taskId
,
queryId
,
pTaskInfo
->
id
.
str
);
return
pTaskInfo
;
}
bool
isTaskKilled
(
SExecTaskInfo
*
pTaskInfo
)
{
return
(
0
!=
pTaskInfo
->
code
);
}
void
setTaskKilled
(
SExecTaskInfo
*
pTaskInfo
,
int32_t
rspCode
)
{
pTaskInfo
->
code
=
rspCode
;
stopTableScanOperator
(
pTaskInfo
->
pRoot
,
pTaskInfo
->
id
.
str
);
}
void
setTaskStatus
(
SExecTaskInfo
*
pTaskInfo
,
int8_t
status
)
{
if
(
status
==
TASK_NOT_COMPLETED
)
{
pTaskInfo
->
status
=
status
;
}
else
{
// QUERY_NOT_COMPLETED is not compatible with any other status, so clear its position first
CLEAR_QUERY_STATUS
(
pTaskInfo
,
TASK_NOT_COMPLETED
);
pTaskInfo
->
status
|=
status
;
}
}
int32_t
createExecTaskInfo
(
SSubplan
*
pPlan
,
SExecTaskInfo
**
pTaskInfo
,
SReadHandle
*
pHandle
,
uint64_t
taskId
,
int32_t
vgId
,
char
*
sql
,
EOPTR_EXEC_MODEL
model
)
{
*
pTaskInfo
=
doCreateTask
(
pPlan
->
id
.
queryId
,
taskId
,
vgId
,
model
,
pPlan
->
dbFName
);
if
(
*
pTaskInfo
==
NULL
)
{
goto
_complete
;
}
if
(
pHandle
)
{
if
(
pHandle
->
pStateBackend
)
{
(
*
pTaskInfo
)
->
streamInfo
.
pState
=
pHandle
->
pStateBackend
;
}
}
(
*
pTaskInfo
)
->
sql
=
sql
;
sql
=
NULL
;
(
*
pTaskInfo
)
->
pSubplan
=
pPlan
;
(
*
pTaskInfo
)
->
pRoot
=
createOperator
(
pPlan
->
pNode
,
*
pTaskInfo
,
pHandle
,
pPlan
->
pTagCond
,
pPlan
->
pTagIndexCond
,
pPlan
->
user
,
pPlan
->
dbFName
);
if
(
NULL
==
(
*
pTaskInfo
)
->
pRoot
)
{
terrno
=
(
*
pTaskInfo
)
->
code
;
goto
_complete
;
}
return
TSDB_CODE_SUCCESS
;
_complete:
taosMemoryFree
(
sql
);
doDestroyTask
(
*
pTaskInfo
);
return
terrno
;
}
void
cleanupQueriedTableScanInfo
(
SSchemaInfo
*
pSchemaInfo
)
{
taosMemoryFreeClear
(
pSchemaInfo
->
dbname
);
taosMemoryFreeClear
(
pSchemaInfo
->
tablename
);
tDeleteSSchemaWrapper
(
pSchemaInfo
->
sw
);
tDeleteSSchemaWrapper
(
pSchemaInfo
->
qsw
);
}
int32_t
initQueriedTableSchemaInfo
(
SReadHandle
*
pHandle
,
SScanPhysiNode
*
pScanNode
,
const
char
*
dbName
,
SExecTaskInfo
*
pTaskInfo
)
{
SMetaReader
mr
=
{
0
};
if
(
pHandle
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_PARA
;
return
terrno
;
}
metaReaderInit
(
&
mr
,
pHandle
->
meta
,
0
);
int32_t
code
=
metaGetTableEntryByUidCache
(
&
mr
,
pScanNode
->
uid
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"failed to get the table meta, uid:0x%"
PRIx64
", suid:0x%"
PRIx64
", %s"
,
pScanNode
->
uid
,
pScanNode
->
suid
,
GET_TASKID
(
pTaskInfo
));
metaReaderClear
(
&
mr
);
return
terrno
;
}
SSchemaInfo
*
pSchemaInfo
=
&
pTaskInfo
->
schemaInfo
;
pSchemaInfo
->
tablename
=
taosStrdup
(
mr
.
me
.
name
);
pSchemaInfo
->
dbname
=
taosStrdup
(
dbName
);
if
(
mr
.
me
.
type
==
TSDB_SUPER_TABLE
)
{
pSchemaInfo
->
sw
=
tCloneSSchemaWrapper
(
&
mr
.
me
.
stbEntry
.
schemaRow
);
pSchemaInfo
->
tversion
=
mr
.
me
.
stbEntry
.
schemaTag
.
version
;
}
else
if
(
mr
.
me
.
type
==
TSDB_CHILD_TABLE
)
{
tDecoderClear
(
&
mr
.
coder
);
tb_uid_t
suid
=
mr
.
me
.
ctbEntry
.
suid
;
code
=
metaGetTableEntryByUidCache
(
&
mr
,
suid
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
metaReaderClear
(
&
mr
);
return
terrno
;
}
pSchemaInfo
->
sw
=
tCloneSSchemaWrapper
(
&
mr
.
me
.
stbEntry
.
schemaRow
);
pSchemaInfo
->
tversion
=
mr
.
me
.
stbEntry
.
schemaTag
.
version
;
}
else
{
pSchemaInfo
->
sw
=
tCloneSSchemaWrapper
(
&
mr
.
me
.
ntbEntry
.
schemaRow
);
}
metaReaderClear
(
&
mr
);
pSchemaInfo
->
qsw
=
extractQueriedColumnSchema
(
pScanNode
);
return
TSDB_CODE_SUCCESS
;
}
SSchemaWrapper
*
extractQueriedColumnSchema
(
SScanPhysiNode
*
pScanNode
)
{
int32_t
numOfCols
=
LIST_LENGTH
(
pScanNode
->
pScanCols
);
int32_t
numOfTags
=
LIST_LENGTH
(
pScanNode
->
pScanPseudoCols
);
SSchemaWrapper
*
pqSw
=
taosMemoryCalloc
(
1
,
sizeof
(
SSchemaWrapper
));
pqSw
->
pSchema
=
taosMemoryCalloc
(
numOfCols
+
numOfTags
,
sizeof
(
SSchema
));
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
STargetNode
*
pNode
=
(
STargetNode
*
)
nodesListGetNode
(
pScanNode
->
pScanCols
,
i
);
SColumnNode
*
pColNode
=
(
SColumnNode
*
)
pNode
->
pExpr
;
SSchema
*
pSchema
=
&
pqSw
->
pSchema
[
pqSw
->
nCols
++
];
pSchema
->
colId
=
pColNode
->
colId
;
pSchema
->
type
=
pColNode
->
node
.
resType
.
type
;
pSchema
->
bytes
=
pColNode
->
node
.
resType
.
bytes
;
tstrncpy
(
pSchema
->
name
,
pColNode
->
colName
,
tListLen
(
pSchema
->
name
));
}
// this the tags and pseudo function columns, we only keep the tag columns
for
(
int32_t
i
=
0
;
i
<
numOfTags
;
++
i
)
{
STargetNode
*
pNode
=
(
STargetNode
*
)
nodesListGetNode
(
pScanNode
->
pScanPseudoCols
,
i
);
int32_t
type
=
nodeType
(
pNode
->
pExpr
);
if
(
type
==
QUERY_NODE_COLUMN
)
{
SColumnNode
*
pColNode
=
(
SColumnNode
*
)
pNode
->
pExpr
;
SSchema
*
pSchema
=
&
pqSw
->
pSchema
[
pqSw
->
nCols
++
];
pSchema
->
colId
=
pColNode
->
colId
;
pSchema
->
type
=
pColNode
->
node
.
resType
.
type
;
pSchema
->
bytes
=
pColNode
->
node
.
resType
.
bytes
;
tstrncpy
(
pSchema
->
name
,
pColNode
->
colName
,
tListLen
(
pSchema
->
name
));
}
}
return
pqSw
;
}
static
void
cleanupStreamInfo
(
SStreamTaskInfo
*
pStreamInfo
)
{
tDeleteSSchemaWrapper
(
pStreamInfo
->
schema
);
}
static
void
freeBlock
(
void
*
pParam
)
{
SSDataBlock
*
pBlock
=
*
(
SSDataBlock
**
)
pParam
;
blockDataDestroy
(
pBlock
);
}
void
doDestroyTask
(
SExecTaskInfo
*
pTaskInfo
)
{
qDebug
(
"%s execTask is freed"
,
GET_TASKID
(
pTaskInfo
));
destroyOperator
(
pTaskInfo
->
pRoot
);
cleanupQueriedTableScanInfo
(
&
pTaskInfo
->
schemaInfo
);
cleanupStreamInfo
(
&
pTaskInfo
->
streamInfo
);
if
(
!
pTaskInfo
->
localFetch
.
localExec
)
{
nodesDestroyNode
((
SNode
*
)
pTaskInfo
->
pSubplan
);
}
taosArrayDestroyEx
(
pTaskInfo
->
pResultBlockList
,
freeBlock
);
taosArrayDestroy
(
pTaskInfo
->
stopInfo
.
pStopInfo
);
taosMemoryFreeClear
(
pTaskInfo
->
sql
);
taosMemoryFreeClear
(
pTaskInfo
->
id
.
str
);
taosMemoryFreeClear
(
pTaskInfo
);
}
void
buildTaskId
(
uint64_t
taskId
,
uint64_t
queryId
,
char
*
dst
)
{
char
*
p
=
dst
;
int32_t
offset
=
6
;
memcpy
(
p
,
"TID:0x"
,
offset
);
offset
+=
tintToHex
(
taskId
,
&
p
[
offset
]);
memcpy
(
&
p
[
offset
],
" QID:0x"
,
7
);
offset
+=
7
;
offset
+=
tintToHex
(
queryId
,
&
p
[
offset
]);
p
[
offset
]
=
0
;
}
source/libs/executor/src/scanoperator.c
浏览文件 @
7adc2102
...
...
@@ -30,6 +30,8 @@
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
#include "operator.h"
#include "querytask.h"
int32_t
scanDebug
=
0
;
...
...
@@ -2304,7 +2306,7 @@ static void destroyStreamScanOperatorInfo(void* param) {
SStreamScanInfo
*
pStreamScan
=
(
SStreamScanInfo
*
)
param
;
if
(
pStreamScan
->
pTableScanOp
&&
pStreamScan
->
pTableScanOp
->
info
)
{
destroyOperator
Info
(
pStreamScan
->
pTableScanOp
);
destroyOperator
(
pStreamScan
->
pTableScanOp
);
}
if
(
pStreamScan
->
tqReader
)
{
...
...
source/libs/executor/src/sortoperator.c
浏览文件 @
7adc2102
...
...
@@ -16,6 +16,8 @@
#include "filter.h"
#include "executorimpl.h"
#include "tdatablock.h"
#include "operator.h"
#include "querytask.h"
typedef
struct
SSortOperatorInfo
{
SOptrBasicInfo
binfo
;
...
...
source/libs/executor/src/sysscanoperator.c
浏览文件 @
7adc2102
...
...
@@ -31,6 +31,9 @@
#include "thash.h"
#include "ttypes.h"
#include "vnode.h"
#include "operator.h"
#include "querytask.h"
typedef
int
(
*
__optSysFilter
)(
void
*
a
,
void
*
b
,
int16_t
dtype
);
typedef
int32_t
(
*
__sys_filte
)(
void
*
pMeta
,
SNode
*
cond
,
SArray
*
result
);
...
...
source/libs/executor/src/timesliceoperator.c
浏览文件 @
7adc2102
...
...
@@ -21,6 +21,8 @@
#include "tdatablock.h"
#include "tfill.h"
#include "ttime.h"
#include "operator.h"
#include "querytask.h"
typedef
struct
STimeSliceOperatorInfo
{
SSDataBlock
*
pRes
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
7adc2102
...
...
@@ -21,6 +21,8 @@
#include "tdatablock.h"
#include "tfill.h"
#include "ttime.h"
#include "operator.h"
#include "querytask.h"
#define IS_FINAL_OP(op) ((op)->isFinal)
#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL);
...
...
@@ -1606,7 +1608,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
int32_t
size
=
taosArrayGetSize
(
pInfo
->
pChildren
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
i
);
destroyOperator
Info
(
pChildOp
);
destroyOperator
(
pChildOp
);
}
taosArrayDestroy
(
pInfo
->
pChildren
);
}
...
...
@@ -2835,7 +2837,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
int32_t
size
=
taosArrayGetSize
(
pInfo
->
pChildren
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SOperatorInfo
*
pChild
=
taosArrayGetP
(
pInfo
->
pChildren
,
i
);
destroyOperator
Info
(
pChild
);
destroyOperator
(
pChild
);
}
taosArrayDestroy
(
pInfo
->
pChildren
);
}
...
...
@@ -3812,7 +3814,7 @@ void destroyStreamStateOperatorInfo(void* param) {
int32_t
size
=
taosArrayGetSize
(
pInfo
->
pChildren
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
SOperatorInfo
*
pChild
=
taosArrayGetP
(
pInfo
->
pChildren
,
i
);
destroyOperator
Info
(
pChild
);
destroyOperator
(
pChild
);
}
taosArrayDestroy
(
pInfo
->
pChildren
);
}
...
...
source/libs/executor/test/executorTests.cpp
浏览文件 @
7adc2102
...
...
@@ -29,11 +29,8 @@
#include "taos.h"
#include "tdatablock.h"
#include "tdef.h"
#include "tglobal.h"
#include "tmsg.h"
#include "tname.h"
#include "trpc.h"
#include "tvariant.h"
#include "operator.h"
namespace
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录