Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d3a05cc2
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
d3a05cc2
编写于
7月 04, 2022
作者:
L
Liu Jicong
提交者:
GitHub
7月 04, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14525 from taosdata/feature/stream
fix(tmq): prepare scan should reset operator status
上级
4e092384
8127887d
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
17 addition
and
9 deletion
+17
-9
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+17
-9
未找到文件。
source/libs/executor/src/executorimpl.c
浏览文件 @
d3a05cc2
...
...
@@ -2843,11 +2843,18 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
int32_t
doPrepareScan
(
SOperatorInfo
*
pOperator
,
uint64_t
uid
,
int64_t
ts
)
{
int32_t
type
=
pOperator
->
operatorType
;
pOperator
->
status
=
OP_OPENED
;
if
(
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
SStreamBlockScanInfo
*
pScanInfo
=
pOperator
->
info
;
pScanInfo
->
blockType
=
STREAM_INPUT__DATA_SCAN
;
pScanInfo
->
pSnapshotReadOp
->
status
=
OP_OPENED
;
STableScanInfo
*
pInfo
=
pScanInfo
->
pSnapshotReadOp
->
info
;
ASSERT
(
pInfo
->
scanMode
==
TABLE_SCAN__TABLE_ORDER
);
if
(
uid
==
0
)
{
pInfo
->
noTable
=
1
;
return
TSDB_CODE_SUCCESS
;
...
...
@@ -2861,14 +2868,6 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
pInfo
->
noTable
=
0
;
if
(
pInfo
->
lastStatus
.
uid
!=
uid
||
pInfo
->
lastStatus
.
ts
!=
ts
)
{
tsdbSetTableId
(
pInfo
->
dataReader
,
uid
);
int64_t
oldSkey
=
pInfo
->
cond
.
twindows
[
0
].
skey
;
pInfo
->
cond
.
twindows
[
0
].
skey
=
ts
+
1
;
tsdbResetReadHandle
(
pInfo
->
dataReader
,
&
pInfo
->
cond
,
0
);
pInfo
->
cond
.
twindows
[
0
].
skey
=
oldSkey
;
pInfo
->
scanTimes
=
0
;
pInfo
->
curTWinIdx
=
0
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
int32_t
tableSz
=
taosArrayGetSize
(
pTaskInfo
->
tableqinfoList
.
pTableList
);
...
...
@@ -2880,8 +2879,17 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
pInfo
->
currentTable
=
i
;
}
}
// TODO after processing drop,
// TODO after processing drop,
found can be false
ASSERT
(
found
);
tsdbSetTableId
(
pInfo
->
dataReader
,
uid
);
int64_t
oldSkey
=
pInfo
->
cond
.
twindows
[
0
].
skey
;
pInfo
->
cond
.
twindows
[
0
].
skey
=
ts
+
1
;
tsdbResetReadHandle
(
pInfo
->
dataReader
,
&
pInfo
->
cond
,
0
);
pInfo
->
cond
.
twindows
[
0
].
skey
=
oldSkey
;
pInfo
->
scanTimes
=
0
;
pInfo
->
curTWinIdx
=
0
;
qDebug
(
"tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d"
,
uid
,
ts
,
pInfo
->
currentTable
,
tableSz
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录