Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ad3075db
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ad3075db
编写于
5月 03, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: do some internal refactor.
上级
90202c72
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
137 addition
and
166 deletion
+137
-166
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+4
-5
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+11
-10
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+2
-2
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+115
-144
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+5
-5
未找到文件。
source/libs/executor/inc/executorimpl.h
浏览文件 @
ad3075db
...
...
@@ -268,7 +268,7 @@ typedef struct SOperatorFpSet {
typedef
struct
SOperatorInfo
{
uint8_t
operatorType
;
bool
blocking
Optr
;
// block operator or not
bool
blocking
;
// block operator or not
uint8_t
status
;
// denote if current operator is completed
int32_t
numOfOutput
;
// number of columns of the current operator results
char
*
name
;
// name, used to show the query execution plan
...
...
@@ -341,9 +341,9 @@ typedef struct STableScanInfo {
int64_t
elapsedTime
;
int32_t
prevGroupId
;
// previous table group id
SScanInfo
scanInfo
;
int32_t
current
;
SNode
*
pFilterNode
;
// filter
operator info
SqlFunctionCtx
*
pCtx
;
//
next
operator query context
int32_t
scanTimes
;
SNode
*
pFilterNode
;
// filter
info, which is push down by optimizer
SqlFunctionCtx
*
pCtx
;
//
which belongs to the direct upstream operator
operator query context
SResultRowInfo
*
pResultRowInfo
;
int32_t
*
rowCellInfoOffset
;
SExprInfo
*
pExpr
;
...
...
@@ -397,7 +397,6 @@ typedef struct SSysTableScanInfo {
SArray
*
scanCols
;
// SArray<int16_t> scan column id list
SName
name
;
SSDataBlock
*
pRes
;
int32_t
capacity
;
int64_t
numOfBlocks
;
// extract basic running information.
SLoadRemoteDataInfo
loadInfo
;
}
SSysTableScanInfo
;
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
ad3075db
...
...
@@ -132,6 +132,7 @@ void doSetOperatorCompleted(SOperatorInfo* pOperator) {
int32_t
operatorDummyOpenFn
(
SOperatorInfo
*
pOperator
)
{
OPTR_SET_OPENED
(
pOperator
);
pOperator
->
cost
.
openCost
=
0
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -3282,7 +3283,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
pOperator
->
name
=
"ExchangeOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
;
pOperator
->
blocking
Optr
=
false
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
numOfOutput
=
pBlock
->
info
.
numOfCols
;
...
...
@@ -3673,7 +3674,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
pOperator
->
name
=
"SortedMerge"
;
// pOperator->operatorType = OP_SortedMerge;
pOperator
->
blocking
Optr
=
true
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
numOfOutput
=
num
;
...
...
@@ -3756,7 +3757,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
pOperator
->
name
=
"SortOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_SORT
;
pOperator
->
blocking
Optr
=
true
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
...
...
@@ -4419,7 +4420,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator
->
name
=
"TableAggregate"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_AGG
;
pOperator
->
blocking
Optr
=
true
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pExpr
=
pExprInfo
;
...
...
@@ -4532,7 +4533,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
pOperator
->
name
=
"ProjectOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_PROJECT
;
pOperator
->
blocking
Optr
=
false
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pExpr
=
pExprInfo
;
...
...
@@ -4615,7 +4616,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
}
pOperator
->
name
=
"FillOperator"
;
pOperator
->
blocking
Optr
=
false
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_FILL
;
pOperator
->
pExpr
=
pExpr
;
...
...
@@ -4861,7 +4862,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pScanPhyNode
->
node
.
pOutputDataBlockDesc
);
SQueryTableDataCond
cond
=
{
0
};
int32_t
code
=
initQueryTableDataCond
(
&
cond
,
pTableScanNode
);
int32_t
code
=
initQueryTableDataCond
(
&
cond
,
pTableScanNode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
NULL
;
}
...
...
@@ -4877,8 +4879,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
==
type
)
{
SScanPhysiNode
*
pScanPhyNode
=
(
SScanPhysiNode
*
)
pPhyNode
;
// simple child table.
int32_t
code
=
doCreateTableGroup
(
pHandle
->
meta
,
pScanPhyNode
->
tableType
,
pScanPhyNode
->
uid
,
pTableGroupInfo
,
queryId
,
taskId
);
int32_t
code
=
doCreateTableGroup
(
pHandle
->
meta
,
pScanPhyNode
->
tableType
,
pScanPhyNode
->
uid
,
pTableGroupInfo
,
queryId
,
taskId
);
SArray
*
tableIdList
=
extractTableIdList
(
pTableGroupInfo
);
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pScanPhyNode
->
node
.
pOutputDataBlockDesc
);
...
...
@@ -5628,7 +5629,7 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf
pOperator
->
name
=
"JoinOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_JOIN
;
pOperator
->
blocking
Optr
=
false
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
pExpr
=
pExprInfo
;
pOperator
->
numOfOutput
=
numOfCols
;
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
ad3075db
...
...
@@ -353,7 +353,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
,
8
);
pOperator
->
name
=
"GroupbyAggOperator"
;
pOperator
->
blocking
Optr
=
true
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
// pOperator->operatorType = OP_Groupby;
pOperator
->
pExpr
=
pExprInfo
;
...
...
@@ -612,7 +612,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
}
pOperator
->
name
=
"PartitionOperator"
;
pOperator
->
blocking
Optr
=
true
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_PARTITION
;
pInfo
->
binfo
.
pRes
=
pResultBlock
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
ad3075db
...
...
@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "
common/
ttime.h"
#include "ttime.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
...
...
@@ -36,10 +36,10 @@
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
int32_t
buildSysDbTableInfo
(
const
SSysTableScanInfo
*
pInfo
);
static
int32_t
buildSysDbTableInfo
(
const
SSysTableScanInfo
*
pInfo
,
int32_t
capacity
);
static
int32_t
buildDbTableInfoBlock
(
const
SSDataBlock
*
p
,
const
SSysTableMeta
*
pSysDbTableMeta
,
size_t
size
,
const
char
*
dbName
);
int32_t
buildDbTableInfoBlock
(
const
SSDataBlock
*
p
,
const
SSysTableMeta
*
pSysDbTableMeta
,
size_t
size
,
const
char
*
dbName
);
void
switchCtxOrder
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
)
{
static
void
switchCtxOrder
(
SqlFunctionCtx
*
pCtx
,
int32_t
numOfOutput
)
{
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
SWITCH_ORDER
(
pCtx
[
i
].
order
);
}
...
...
@@ -158,7 +158,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
return
false
;
}
int32_t
loadDataBlock
(
SOperatorInfo
*
pOperator
,
STableScanInfo
*
pTableScanInfo
,
SSDataBlock
*
pBlock
,
uint32_t
*
status
)
{
static
int32_t
loadDataBlock
(
SOperatorInfo
*
pOperator
,
STableScanInfo
*
pTableScanInfo
,
SSDataBlock
*
pBlock
,
uint32_t
*
status
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
STableScanInfo
*
pInfo
=
pOperator
->
info
;
...
...
@@ -296,15 +296,16 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return
NULL
;
}
while
(
pTableScanInfo
->
current
<
pTableScanInfo
->
scanInfo
.
numOfAsc
)
{
// do the ascending order traverse in the first place.
while
(
pTableScanInfo
->
scanTimes
<
pTableScanInfo
->
scanInfo
.
numOfAsc
)
{
SSDataBlock
*
p
=
doTableScanImpl
(
pOperator
);
if
(
p
!=
NULL
)
{
return
p
;
}
pTableScanInfo
->
current
+=
1
;
pTableScanInfo
->
scanTimes
+=
1
;
if
(
pTableScanInfo
->
current
<
pTableScanInfo
->
scanInfo
.
numOfAsc
)
{
if
(
pTableScanInfo
->
scanTimes
<
pTableScanInfo
->
scanInfo
.
numOfAsc
)
{
setTaskStatus
(
pTaskInfo
,
TASK_NOT_COMPLETED
);
pTableScanInfo
->
scanFlag
=
REPEAT_SCAN
;
...
...
@@ -319,7 +320,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
}
int32_t
total
=
pTableScanInfo
->
scanInfo
.
numOfAsc
+
pTableScanInfo
->
scanInfo
.
numOfDesc
;
if
(
pTableScanInfo
->
current
<
total
)
{
if
(
pTableScanInfo
->
scanTimes
<
total
)
{
if
(
pTableScanInfo
->
cond
.
order
==
TSDB_ORDER_ASC
)
{
prepareForDescendingScan
(
pTableScanInfo
,
pTableScanInfo
->
pCtx
,
pTableScanInfo
->
numOfOutput
);
tsdbResetReadHandle
(
pTableScanInfo
->
dataReader
,
&
pTableScanInfo
->
cond
);
...
...
@@ -329,21 +330,20 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
qDebug
(
"%s start to descending order scan data blocks due to query func required, qrange:%"
PRId64
"-%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pWin
->
skey
,
pWin
->
ekey
);
while
(
pTableScanInfo
->
current
<
total
)
{
while
(
pTableScanInfo
->
scanTimes
<
total
)
{
SSDataBlock
*
p
=
doTableScanImpl
(
pOperator
);
if
(
p
!=
NULL
)
{
return
p
;
}
pTableScanInfo
->
current
+=
1
;
pTableScanInfo
->
scanTimes
+=
1
;
if
(
pTableScanInfo
->
current
<
pTableScanInfo
->
scanInfo
.
numOfAsc
)
{
if
(
pTableScanInfo
->
scanTimes
<
pTableScanInfo
->
scanInfo
.
numOfAsc
)
{
setTaskStatus
(
pTaskInfo
,
TASK_NOT_COMPLETED
);
pTableScanInfo
->
scanFlag
=
REPEAT_SCAN
;
qDebug
(
"%s start to repeat descending order scan data blocks due to query func required, qrange:%"
PRId64
"-%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pTaskInfo
->
window
.
skey
,
pTaskInfo
->
window
.
ekey
);
"-%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pTaskInfo
->
window
.
skey
,
pTaskInfo
->
window
.
ekey
);
// do prepare for the next round table scan operation
tsdbResetReadHandle
(
pTableScanInfo
->
dataReader
,
&
pTableScanInfo
->
cond
);
...
...
@@ -372,48 +372,50 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCon
pInfo
->
cond
=
*
pCond
;
pInfo
->
scanInfo
=
(
SScanInfo
){.
numOfAsc
=
scanInfo
[
0
],
.
numOfDesc
=
scanInfo
[
1
]};
pInfo
->
interval
=
*
pInterval
;
pInfo
->
sampleRatio
=
sampleRatio
;
pInfo
->
interval
=
*
pInterval
;
pInfo
->
sampleRatio
=
sampleRatio
;
pInfo
->
dataBlockLoadFlag
=
dataLoadFlag
;
pInfo
->
pResBlock
=
pResBlock
;
pInfo
->
pFilterNode
=
pCondition
;
pInfo
->
dataReader
=
pDataReader
;
pInfo
->
current
=
0
;
pInfo
->
scanFlag
=
MAIN_SCAN
;
pInfo
->
pColMatchInfo
=
pColMatchInfo
;
pOperator
->
name
=
"TableScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
fpSet
.
getNextFn
=
doTableScan
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pInfo
->
pResBlock
=
pResBlock
;
pInfo
->
pFilterNode
=
pCondition
;
pInfo
->
dataReader
=
pDataReader
;
pInfo
->
scanFlag
=
MAIN_SCAN
;
pInfo
->
pColMatchInfo
=
pColMatchInfo
;
pOperator
->
name
=
"TableScanOperator"
;
// for dubug purpose
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doTableScan
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
)
;
static
int32_t
cost
=
0
;
pOperator
->
cost
.
openCost
=
++
cost
;
// for non-blocking operator, the open cost is always 0
pOperator
->
cost
.
openCost
=
0
;
pOperator
->
cost
.
totalCost
=
++
cost
;
pOperator
->
resultInfo
.
totalRows
=
++
cost
;
return
pOperator
;
}
SOperatorInfo
*
createTableSeqScanOperatorInfo
(
void
*
p
TsdbReadHandle
)
{
SOperatorInfo
*
createTableSeqScanOperatorInfo
(
void
*
p
ReadHandle
,
SExecTaskInfo
*
pTaskInfo
)
{
STableScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
STableScanInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
pInfo
->
dataReader
=
pTsdbReadHandle
;
pInfo
->
current
=
0
;
pInfo
->
prevGroupId
=
-
1
;
pInfo
->
dataReader
=
pReadHandle
;
pInfo
->
prevGroupId
=
-
1
;
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
pOperator
->
name
=
"TableSeqScanOperator"
;
pOperator
->
name
=
"TableSeqScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN
;
pOperator
->
blocking
Optr
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
fpSet
.
getNextFn
=
doTableScanImpl
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doTableScanImpl
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
,
NULL
);
return
pOperator
;
}
...
...
@@ -482,7 +484,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo*
pOperator
->
name
=
"DataBlockInfoScanOperator"
;
// pOperator->operatorType = OP_TableBlockInfoScan;
pOperator
->
blocking
Optr
=
false
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
fpSet
.
_openFn
=
operatorDummyOpenFn
;
pOperator
->
fpSet
.
getNextFn
=
doBlockInfoScan
;
...
...
@@ -636,20 +638,22 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock*
}
pInfo
->
readerHandle
=
streamReadHandle
;
pInfo
->
pRes
=
pResBlock
;
pInfo
->
pCondition
=
pCondition
;
pOperator
->
name
=
"StreamBlockScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
;
pOperator
->
blocking
Optr
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
numOfOutput
=
pResBlock
->
info
.
numOfCols
;
pOperator
->
fpSet
.
_openFn
=
operatorDummyOpenFn
;
pInfo
->
pRes
=
pResBlock
;
pInfo
->
pCondition
=
pCondition
;
pOperator
->
name
=
"StreamBlockScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
numOfOutput
=
pResBlock
->
info
.
numOfCols
;
pOperator
->
fpSet
.
_openFn
=
operatorDummyOpenFn
;
pOperator
->
fpSet
.
getNextFn
=
doStreamBlockScan
;
pOperator
->
fpSet
.
closeFn
=
operatorDummyCloseFn
;
pOperator
->
fpSet
.
closeFn
=
operatorDummyCloseFn
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doStreamBlockScan
,
NULL
,
NULL
,
operatorDummyCloseFn
,
NULL
,
NULL
,
NULL
);
return
pOperator
;
}
...
...
@@ -671,14 +675,12 @@ EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) {
switch
(
nType
)
{
case
QUERY_NODE_OPERATOR
:
{
SOperatorNode
*
node
=
(
SOperatorNode
*
)
pNode
;
if
(
OP_TYPE_EQUAL
==
node
->
opType
)
{
*
(
int32_t
*
)
pContext
=
1
;
return
DEAL_RES_CONTINUE
;
}
*
(
int32_t
*
)
pContext
=
0
;
return
DEAL_RES_IGNORE_CHILD
;
}
case
QUERY_NODE_COLUMN
:
{
...
...
@@ -709,19 +711,17 @@ EDealRes getDBNameFromConditionWalker(SNode* pNode, void* pContext) {
default:
break
;
}
return
DEAL_RES_CONTINUE
;
}
void
getDBNameFromCondition
(
SNode
*
pCondition
,
char
*
dbName
)
{
static
void
getDBNameFromCondition
(
SNode
*
pCondition
,
const
char
*
dbName
)
{
if
(
NULL
==
pCondition
)
{
return
;
}
nodesWalkExpr
(
pCondition
,
getDBNameFromConditionWalker
,
dbName
);
nodesWalkExpr
(
pCondition
,
getDBNameFromConditionWalker
,
(
char
*
)
dbName
);
}
static
int32_t
loadSysTableC
ontentCb
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
static
int32_t
loadSysTableC
allback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
SOperatorInfo
*
operator
=
(
SOperatorInfo
*
)
param
;
SSysTableScanInfo
*
pScanResInfo
=
(
SSysTableScanInfo
*
)
operator
->
info
;
if
(
TSDB_CODE_SUCCESS
==
code
)
{
...
...
@@ -746,13 +746,14 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
}
SFilterInfo
*
filter
=
NULL
;
int32_t
code
=
filterInitFromNode
(
pInfo
->
pCondition
,
&
filter
,
0
);
int32_t
code
=
filterInitFromNode
(
pInfo
->
pCondition
,
&
filter
,
0
);
SFilterColumnParam
param1
=
{.
numOfCols
=
pInfo
->
pRes
->
info
.
numOfCols
,
.
pDataBlock
=
pInfo
->
pRes
->
pDataBlock
};
code
=
filterSetDataFromSlotId
(
filter
,
&
param1
);
int8_t
*
rowRes
=
NULL
;
bool
keep
=
filterExecute
(
filter
,
pInfo
->
pRes
,
&
rowRes
,
NULL
,
param1
.
numOfCols
);
bool
keep
=
filterExecute
(
filter
,
pInfo
->
pRes
,
&
rowRes
,
NULL
,
param1
.
numOfCols
);
filterFreeInfo
(
filter
);
SSDataBlock
*
px
=
createOneDataBlock
(
pInfo
->
pRes
,
false
);
...
...
@@ -795,61 +796,31 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
static
SSDataBlock
*
buildSysTableMetaBlock
()
{
SSDataBlock
*
pBlock
=
taosMemoryCalloc
(
1
,
sizeof
(
SSDataBlock
));
pBlock
->
info
.
numOfCols
=
10
;
pBlock
->
info
.
hasVarCol
=
true
;
size_t
size
=
0
;
const
SSysTableMeta
*
pMeta
=
NULL
;
getInfosDbMeta
(
&
pMeta
,
&
size
);
int32_t
index
=
0
;
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
if
(
strcmp
(
pMeta
[
i
].
name
,
TSDB_INS_TABLE_USER_TABLES
)
==
0
)
{
index
=
i
;
break
;
}
}
pBlock
->
pDataBlock
=
taosArrayInit
(
pBlock
->
info
.
numOfCols
,
sizeof
(
SColumnInfoData
));
SColumnInfoData
colInfoData
=
{
0
};
colInfoData
.
info
.
colId
=
1
;
colInfoData
.
info
.
type
=
TSDB_DATA_TYPE_VARCHAR
;
colInfoData
.
info
.
bytes
=
(
TSDB_TABLE_NAME_LEN
-
1
)
+
VARSTR_HEADER_SIZE
;
taosArrayPush
(
pBlock
->
pDataBlock
,
&
colInfoData
);
colInfoData
.
info
.
colId
=
2
;
colInfoData
.
info
.
type
=
TSDB_DATA_TYPE_VARCHAR
;
colInfoData
.
info
.
bytes
=
(
TSDB_DB_NAME_LEN
-
1
)
+
VARSTR_HEADER_SIZE
;
taosArrayPush
(
pBlock
->
pDataBlock
,
&
colInfoData
);
colInfoData
.
info
.
colId
=
3
;
colInfoData
.
info
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
colInfoData
.
info
.
bytes
=
8
;
taosArrayPush
(
pBlock
->
pDataBlock
,
&
colInfoData
);
colInfoData
.
info
.
colId
=
4
;
colInfoData
.
info
.
type
=
TSDB_DATA_TYPE_INT
;
colInfoData
.
info
.
bytes
=
4
;
taosArrayPush
(
pBlock
->
pDataBlock
,
&
colInfoData
);
colInfoData
.
info
.
colId
=
5
;
colInfoData
.
info
.
type
=
TSDB_DATA_TYPE_VARCHAR
;
colInfoData
.
info
.
bytes
=
(
TSDB_TABLE_NAME_LEN
-
1
)
+
VARSTR_HEADER_SIZE
;
taosArrayPush
(
pBlock
->
pDataBlock
,
&
colInfoData
);
colInfoData
.
info
.
colId
=
6
;
colInfoData
.
info
.
type
=
TSDB_DATA_TYPE_BIGINT
;
colInfoData
.
info
.
bytes
=
8
;
taosArrayPush
(
pBlock
->
pDataBlock
,
&
colInfoData
);
colInfoData
.
info
.
colId
=
7
;
colInfoData
.
info
.
type
=
TSDB_DATA_TYPE_INT
;
colInfoData
.
info
.
bytes
=
4
;
taosArrayPush
(
pBlock
->
pDataBlock
,
&
colInfoData
);
colInfoData
.
info
.
colId
=
8
;
colInfoData
.
info
.
type
=
TSDB_DATA_TYPE_INT
;
colInfoData
.
info
.
bytes
=
4
;
taosArrayPush
(
pBlock
->
pDataBlock
,
&
colInfoData
);
colInfoData
.
info
.
colId
=
9
;
colInfoData
.
info
.
type
=
TSDB_DATA_TYPE_VARCHAR
;
colInfoData
.
info
.
bytes
=
512
+
VARSTR_HEADER_SIZE
;
taosArrayPush
(
pBlock
->
pDataBlock
,
&
colInfoData
);
colInfoData
.
info
.
colId
=
10
;
colInfoData
.
info
.
type
=
TSDB_DATA_TYPE_VARCHAR
;
colInfoData
.
info
.
bytes
=
20
+
VARSTR_HEADER_SIZE
;
taosArrayPush
(
pBlock
->
pDataBlock
,
&
colInfoData
);
for
(
int32_t
i
=
0
;
i
<
pMeta
[
index
].
colNum
;
++
i
)
{
SColumnInfoData
colInfoData
=
{
0
};
colInfoData
.
info
.
colId
=
i
+
1
;
colInfoData
.
info
.
type
=
pMeta
[
index
].
schema
[
i
].
type
;
colInfoData
.
info
.
bytes
=
pMeta
[
index
].
schema
[
i
].
bytes
;
taosArrayPush
(
pBlock
->
pDataBlock
,
&
colInfoData
);
}
pBlock
->
info
.
numOfCols
=
pMeta
[
index
].
colNum
;
pBlock
->
info
.
hasVarCol
=
true
;
return
pBlock
;
}
...
...
@@ -868,7 +839,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
return
NULL
;
}
buildSysDbTableInfo
(
pInfo
);
buildSysDbTableInfo
(
pInfo
,
pOperator
->
resultInfo
.
capacity
);
doFilterResult
(
pInfo
);
pInfo
->
loadInfo
.
totalRows
+=
pInfo
->
pRes
->
info
.
rows
;
...
...
@@ -896,7 +867,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
varDataSetLen
(
dbname
,
strlen
(
varDataVal
(
dbname
)));
SSDataBlock
*
p
=
buildSysTableMetaBlock
();
blockDataEnsureCapacity
(
p
,
p
Info
->
capacity
);
blockDataEnsureCapacity
(
p
,
p
Operator
->
resultInfo
.
capacity
);
char
n
[
TSDB_TABLE_NAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
while
(
metaTbCursorNext
(
pInfo
->
pCur
)
==
0
)
{
...
...
@@ -977,7 +948,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
pColInfoData
=
taosArrayGet
(
p
->
pDataBlock
,
9
);
colDataAppend
(
pColInfoData
,
numOfRows
,
str
,
false
);
if
(
++
numOfRows
>=
p
Info
->
capacity
)
{
if
(
++
numOfRows
>=
p
Operator
->
resultInfo
.
capacity
)
{
break
;
}
}
...
...
@@ -1022,7 +993,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
pMsgSendInfo
->
msgInfo
.
pData
=
buf1
;
pMsgSendInfo
->
msgInfo
.
len
=
contLen
;
pMsgSendInfo
->
msgType
=
TDMT_MND_SYSTABLE_RETRIEVE
;
pMsgSendInfo
->
fp
=
loadSysTableC
ontentCb
;
pMsgSendInfo
->
fp
=
loadSysTableC
allback
;
int64_t
transporterId
=
0
;
int32_t
code
=
asyncSendMsgToServer
(
pInfo
->
pTransporter
,
&
pInfo
->
epSet
,
&
transporterId
,
pMsgSendInfo
);
...
...
@@ -1060,9 +1031,9 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
}
}
int32_t
buildSysDbTableInfo
(
const
SSysTableScanInfo
*
pInfo
)
{
int32_t
buildSysDbTableInfo
(
const
SSysTableScanInfo
*
pInfo
,
int32_t
capacity
)
{
SSDataBlock
*
p
=
buildSysTableMetaBlock
();
blockDataEnsureCapacity
(
p
,
pInfo
->
capacity
);
blockDataEnsureCapacity
(
p
,
capacity
);
size_t
size
=
0
;
const
SSysTableMeta
*
pSysDbTableMeta
=
NULL
;
...
...
@@ -1133,18 +1104,19 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
return
NULL
;
}
pInfo
->
accountId
=
accountId
;
pInfo
->
accountId
=
accountId
;
pInfo
->
showRewrite
=
showRewrite
;
pInfo
->
pRes
=
pResBlock
;
pInfo
->
capacity
=
4096
;
pInfo
->
pCondition
=
pCondition
;
pInfo
->
scanCols
=
colList
;
pInfo
->
pRes
=
pResBlock
;
pInfo
->
pCondition
=
pCondition
;
pInfo
->
scanCols
=
colList
;
initResultSizeInfo
(
pOperator
,
4096
);
tNameAssign
(
&
pInfo
->
name
,
pName
);
const
char
*
name
=
tNameGetTableName
(
&
pInfo
->
name
);
if
(
strncasecmp
(
name
,
TSDB_INS_TABLE_USER_TABLES
,
TSDB_TABLE_FNAME_LEN
)
==
0
)
{
pInfo
->
readHandle
=
*
(
SReadHandle
*
)
readHandle
;
blockDataEnsureCapacity
(
pInfo
->
pRes
,
p
Info
->
capacity
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
p
Operator
->
resultInfo
.
capacity
);
}
else
{
tsem_init
(
&
pInfo
->
ready
,
0
,
0
);
pInfo
->
epSet
=
epset
;
...
...
@@ -1173,12 +1145,12 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
#endif
}
pOperator
->
name
=
"SysTableScanOperator"
;
pOperator
->
name
=
"SysTableScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
;
pOperator
->
blocking
Optr
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
numOfOutput
=
pResBlock
->
info
.
numOfCols
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
numOfOutput
=
pResBlock
->
info
.
numOfCols
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doSysTableScan
,
NULL
,
NULL
,
destroySysScanOperator
,
NULL
,
NULL
,
NULL
);
pOperator
->
pTaskInfo
=
pTaskInfo
;
...
...
@@ -1188,7 +1160,6 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
static
SSDataBlock
*
doTagScan
(
SOperatorInfo
*
pOperator
)
{
#if 0
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
...
...
@@ -1322,29 +1293,29 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
pInfo
->
pRes
=
blockDataDestroy
(
pInfo
->
pRes
);
}
SOperatorInfo
*
createTagScanOperatorInfo
(
void
*
pReaderHandle
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
SExecTaskInfo
*
pTaskInfo
)
{
SOperatorInfo
*
createTagScanOperatorInfo
(
void
*
readHandle
,
SExprInfo
*
pExpr
,
int32_t
numOfOutput
,
SExecTaskInfo
*
pTaskInfo
)
{
STagScanInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
STagScanInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
goto
_error
;
}
pInfo
->
pReader
=
pReader
Handle
;
pInfo
->
curPos
=
0
;
pOperator
->
name
=
"TagScanOperator"
;
pInfo
->
pReader
=
read
Handle
;
pInfo
->
curPos
=
0
;
pOperator
->
name
=
"TagScanOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN
;
pOperator
->
blockingOptr
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
pOperator
->
fpSet
=
createOperatorFpSet
(
operatorDummyOpenFn
,
doTagScan
,
NULL
,
NULL
,
destroyTagScanOperatorInfo
,
NULL
,
NULL
,
NULL
);
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfOutput
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
return
pOperator
;
_error:
taosMemoryFree
(
pInfo
);
taosMemoryFree
(
pOperator
);
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
ad3075db
...
...
@@ -1078,7 +1078,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator
->
name
=
"TimeIntervalAggOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
;
pOperator
->
blocking
Optr
=
true
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
pExpr
=
pExprInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
...
...
@@ -1137,7 +1137,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
pOperator
->
name
=
"StreamTimeIntervalAggOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_INTERVAL
;
pOperator
->
blocking
Optr
=
true
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
pExpr
=
pExprInfo
;
pOperator
->
pTaskInfo
=
pTaskInfo
;
...
...
@@ -1343,7 +1343,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator
->
name
=
"TimeSliceOperator"
;
// pOperator->operatorType = OP_AllTimeWindow;
pOperator
->
blocking
Optr
=
true
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
pExpr
=
pExprInfo
;
pOperator
->
numOfOutput
=
numOfCols
;
...
...
@@ -1385,7 +1385,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
pInfo
->
tsSlotId
=
tsSlotId
;
pOperator
->
name
=
"StateWindowOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW
;
pOperator
->
blocking
Optr
=
true
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
pExpr
=
pExpr
;
pOperator
->
numOfOutput
=
numOfCols
;
...
...
@@ -1437,7 +1437,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
pInfo
->
reptScan
=
false
;
pOperator
->
name
=
"SessionWindowAggOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
;
pOperator
->
blocking
Optr
=
true
;
pOperator
->
blocking
=
true
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
pExpr
=
pExprInfo
;
pOperator
->
numOfOutput
=
numOfCols
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录