Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2055879d
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
2055879d
编写于
11月 18, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: do some internal refactor.
上级
786caafa
变更
5
展开全部
隐藏空白更改
内联
并排
Showing
5 changed file
with
142 addition
and
260 deletion
+142
-260
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+13
-18
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+11
-11
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+8
-8
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+108
-221
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+2
-2
未找到文件。
source/libs/executor/inc/executorimpl.h
浏览文件 @
2055879d
...
...
@@ -315,26 +315,31 @@ typedef struct STableMetaCacheInfo {
uint64_t
cacheHit
;
}
STableMetaCacheInfo
;
typedef
struct
STableScan
Info
{
typedef
struct
STableScan
Base
{
STsdbReader
*
dataReader
;
SFileBlockLoadRecorder
readRecorder
;
SQueryTableDataCond
cond
;
SAggOptrPushDownInfo
pdInfo
;
SColMatchInfo
matchInfo
;
SReadHandle
readHandle
;
SExprSupp
pseudoSup
;
STableMetaCacheInfo
metaCache
;
int32_t
scanFlag
;
// table scan flag to denote if it is a repeat/reverse/main scan
}
STableScanBase
;
typedef
struct
STableScanInfo
{
STableScanBase
base
;
SLimitInfo
limitInfo
;
SFileBlockLoadRecorder
readRecorder
;
SScanInfo
scanInfo
;
int32_t
scanTimes
;
SSDataBlock
*
pResBlock
;
SColMatchInfo
matchInfo
;
SExprSupp
pseudoSup
;
SQueryTableDataCond
cond
;
int32_t
scanFlag
;
// table scan flag to denote if it is a repeat/reverse/main scan
int32_t
dataBlockLoadFlag
;
SSampleExecInfo
sample
;
// sample execution info
int32_t
currentGroupId
;
int32_t
currentTable
;
int8_t
scanMode
;
SAggOptrPushDownInfo
pdInfo
;
int8_t
assignBlockUid
;
STableMetaCacheInfo
metaCache
;
}
STableScanInfo
;
typedef
struct
STableMergeScanInfo
{
...
...
@@ -344,8 +349,7 @@ typedef struct STableMergeScanInfo {
bool
hasGroupId
;
uint64_t
groupId
;
SArray
*
queryConds
;
// array of queryTableDataCond
STsdbReader
*
pReader
;
SReadHandle
readHandle
;
STableScanBase
base
;
int32_t
bufPageSize
;
uint32_t
sortBufSize
;
// max buffer size for in-memory sort
SArray
*
pSortInfo
;
...
...
@@ -354,20 +358,11 @@ typedef struct STableMergeScanInfo {
int64_t
startTs
;
// sort start time
SArray
*
sortSourceParams
;
SLimitInfo
limitInfo
;
SFileBlockLoadRecorder
readRecorder
;
int64_t
numOfRows
;
SScanInfo
scanInfo
;
int32_t
scanTimes
;
SqlFunctionCtx
*
pCtx
;
// which belongs to the direct upstream operator operator query context
SResultRowInfo
*
pResultRowInfo
;
int32_t
*
rowEntryInfoOffset
;
SExprInfo
*
pExpr
;
SSDataBlock
*
pResBlock
;
SColMatchInfo
matchInfo
;
int32_t
numOfOutput
;
SExprSupp
pseudoSup
;
SQueryTableDataCond
cond
;
int32_t
scanFlag
;
// table scan flag to denote if it is a repeat/reverse/main scan
int32_t
dataBlockLoadFlag
;
// if the upstream is an interval operator, the interval info is also kept here to get the time
...
...
source/libs/executor/src/executor.c
浏览文件 @
2055879d
...
...
@@ -1026,8 +1026,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
SStreamScanInfo
*
pInfo
=
pOperator
->
info
;
if
(
pOffset
->
type
==
TMQ_OFFSET__LOG
)
{
STableScanInfo
*
pTSInfo
=
pInfo
->
pTableScanOp
->
info
;
tsdbReaderClose
(
pTSInfo
->
dataReader
);
pTSInfo
->
dataReader
=
NULL
;
tsdbReaderClose
(
pTSInfo
->
base
.
dataReader
);
pTSInfo
->
base
.
dataReader
=
NULL
;
#if 0
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
...
...
@@ -1079,23 +1079,23 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
// TODO after dropping table, table may not found
ASSERT
(
found
);
if
(
pTableScanInfo
->
dataReader
==
NULL
)
{
if
(
pTableScanInfo
->
base
.
dataReader
==
NULL
)
{
STableKeyInfo
*
pList
=
tableListGetInfo
(
pTaskInfo
->
pTableInfoList
,
0
);
int32_t
num
=
tableListGetSize
(
pTaskInfo
->
pTableInfoList
);
if
(
tsdbReaderOpen
(
pTableScanInfo
->
readHandle
.
vnode
,
&
pTableScanInfo
->
cond
,
pList
,
num
,
&
pTableScanInfo
->
dataReader
,
NULL
)
<
0
||
pTableScanInfo
->
dataReader
==
NULL
)
{
if
(
tsdbReaderOpen
(
pTableScanInfo
->
base
.
readHandle
.
vnode
,
&
pTableScanInfo
->
base
.
cond
,
pList
,
num
,
&
pTableScanInfo
->
base
.
dataReader
,
NULL
)
<
0
||
pTableScanInfo
->
base
.
dataReader
==
NULL
)
{
ASSERT
(
0
);
}
}
STableKeyInfo
tki
=
{.
uid
=
uid
};
tsdbSetTableList
(
pTableScanInfo
->
dataReader
,
&
tki
,
1
);
int64_t
oldSkey
=
pTableScanInfo
->
cond
.
twindows
.
skey
;
pTableScanInfo
->
cond
.
twindows
.
skey
=
ts
+
1
;
tsdbReaderReset
(
pTableScanInfo
->
dataReader
,
&
pTableScanInfo
->
cond
);
pTableScanInfo
->
cond
.
twindows
.
skey
=
oldSkey
;
tsdbSetTableList
(
pTableScanInfo
->
base
.
dataReader
,
&
tki
,
1
);
int64_t
oldSkey
=
pTableScanInfo
->
base
.
cond
.
twindows
.
skey
;
pTableScanInfo
->
base
.
cond
.
twindows
.
skey
=
ts
+
1
;
tsdbReaderReset
(
pTableScanInfo
->
base
.
dataReader
,
&
pTableScanInfo
->
base
.
cond
);
pTableScanInfo
->
base
.
cond
.
twindows
.
skey
=
oldSkey
;
pTableScanInfo
->
scanTimes
=
0
;
qDebug
(
"tsdb reader offset seek to uid %"
PRId64
" ts %"
PRId64
", table cur set to %d , all table num %d"
,
uid
,
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
2055879d
...
...
@@ -1725,13 +1725,13 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
return
TSDB_CODE_SUCCESS
;
}
else
if
(
type
==
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
)
{
STableScanInfo
*
pTableScanInfo
=
pOperator
->
info
;
*
order
=
pTableScanInfo
->
cond
.
order
;
*
scanFlag
=
pTableScanInfo
->
scanFlag
;
*
order
=
pTableScanInfo
->
base
.
cond
.
order
;
*
scanFlag
=
pTableScanInfo
->
base
.
scanFlag
;
return
TSDB_CODE_SUCCESS
;
}
else
if
(
type
==
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN
)
{
STableMergeScanInfo
*
pTableScanInfo
=
pOperator
->
info
;
*
order
=
pTableScanInfo
->
cond
.
order
;
*
scanFlag
=
pTableScanInfo
->
scanFlag
;
*
order
=
pTableScanInfo
->
base
.
cond
.
order
;
*
scanFlag
=
pTableScanInfo
->
base
.
scanFlag
;
return
TSDB_CODE_SUCCESS
;
}
else
{
if
(
pOperator
->
pDownstream
==
NULL
||
pOperator
->
pDownstream
[
0
]
==
NULL
)
{
...
...
@@ -2378,8 +2378,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
if
(
downstream
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
)
{
STableScanInfo
*
pTableScanInfo
=
downstream
->
info
;
pTableScanInfo
->
pdInfo
.
pExprSup
=
&
pOperator
->
exprSupp
;
pTableScanInfo
->
pdInfo
.
pAggSup
=
&
pInfo
->
aggSup
;
pTableScanInfo
->
base
.
pdInfo
.
pExprSup
=
&
pOperator
->
exprSupp
;
pTableScanInfo
->
base
.
pdInfo
.
pAggSup
=
&
pInfo
->
aggSup
;
}
code
=
appendDownstream
(
pOperator
,
&
downstream
,
1
);
...
...
@@ -2744,7 +2744,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
STableScanInfo
*
pScanInfo
=
pOperator
->
info
;
pTaskInfo
->
cost
.
pRecoder
=
&
pScanInfo
->
readRecorder
;
pTaskInfo
->
cost
.
pRecoder
=
&
pScanInfo
->
base
.
readRecorder
;
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN
==
type
)
{
STableMergeScanPhysiNode
*
pTableScanNode
=
(
STableMergeScanPhysiNode
*
)
pPhyNode
;
...
...
@@ -2769,7 +2769,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
STableScanInfo
*
pScanInfo
=
pOperator
->
info
;
pTaskInfo
->
cost
.
pRecoder
=
&
pScanInfo
->
readRecorder
;
pTaskInfo
->
cost
.
pRecoder
=
&
pScanInfo
->
base
.
readRecorder
;
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
==
type
)
{
pOperator
=
createExchangeOperatorInfo
(
pHandle
?
pHandle
->
pMsgCb
->
clientRpc
:
NULL
,
(
SExchangePhysiNode
*
)
pPhyNode
,
pTaskInfo
);
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
2055879d
此差异已折叠。
点击以展开。
source/libs/executor/src/timewindowoperator.c
浏览文件 @
2055879d
...
...
@@ -2549,8 +2549,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pInfo
->
current
=
pInfo
->
win
.
skey
;
STableScanInfo
*
pScanInfo
=
(
STableScanInfo
*
)
downstream
->
info
;
pScanInfo
->
cond
.
twindows
=
pInfo
->
win
;
pScanInfo
->
cond
.
type
=
TIMEWINDOW_RANGE_EXTERNAL
;
pScanInfo
->
base
.
cond
.
twindows
=
pInfo
->
win
;
pScanInfo
->
base
.
cond
.
type
=
TIMEWINDOW_RANGE_EXTERNAL
;
setOperatorInfo
(
pOperator
,
"TimeSliceOperator"
,
QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录