Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
0bd25489
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
0bd25489
编写于
7月 29, 2023
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix(stream): initialize the filter window initial range.
上级
f6e07d1f
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
101 addition
and
94 deletion
+101
-94
source/dnode/snode/src/snode.c
source/dnode/snode/src/snode.c
+3
-3
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+3
-3
source/dnode/vnode/src/tq/tqRestore.c
source/dnode/vnode/src/tq/tqRestore.c
+2
-3
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+1
-0
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+91
-84
source/libs/stream/src/stream.c
source/libs/stream/src/stream.c
+1
-1
未找到文件。
source/dnode/snode/src/snode.c
浏览文件 @
0bd25489
...
...
@@ -346,9 +346,9 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
rsp
.
status
=
streamTaskCheckStatus
(
pTask
);
streamMetaReleaseTask
(
pSnode
->
pMeta
,
pTask
);
qDebug
(
"s-task:%s recv task check req(reqId:0x%"
PRIx64
") task:0x%x (vgId:%d), status:%s, rsp status %d"
,
pTask
->
id
.
idStr
,
rsp
.
reqId
,
rsp
.
upstreamTaskId
,
rsp
.
upstreamNodeId
,
streamGetTaskStatusStr
(
pTask
->
status
.
taskStatus
)
,
rsp
.
status
);
const
char
*
pStatus
=
streamGetTaskStatusStr
(
pTask
->
status
.
taskStatus
);
qDebug
(
"s-task:%s status:%s, recv task check req(reqId:0x%"
PRIx64
") task:0x%x (vgId:%d), ready:%d"
,
pTask
->
id
.
idStr
,
pStatus
,
rsp
.
reqId
,
rsp
.
upstreamTaskId
,
rsp
.
upstreamNodeId
,
rsp
.
status
);
}
else
{
rsp
.
status
=
0
;
qDebug
(
"tq recv task check(taskId:0x%x not built yet) req(reqId:0x%"
PRIx64
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
0bd25489
...
...
@@ -1041,9 +1041,9 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
rsp
.
status
=
streamTaskCheckStatus
(
pTask
);
streamMetaReleaseTask
(
pTq
->
pStreamMeta
,
pTask
);
tqDebug
(
"s-task:%s recv task check req(reqId:0x%"
PRIx64
") task:0x%x (vgId:%d), status:%s, rsp status %d"
,
pTask
->
id
.
idStr
,
rsp
.
reqId
,
rsp
.
upstreamTaskId
,
rsp
.
upstreamNodeId
,
streamGetTaskStatusStr
(
pTask
->
status
.
taskStatus
)
,
rsp
.
status
);
const
char
*
pStatus
=
streamGetTaskStatusStr
(
pTask
->
status
.
taskStatus
);
tqDebug
(
"s-task:%s status:%s, recv task check req(reqId:0x%"
PRIx64
") task:0x%x (vgId:%d), ready:%d"
,
pTask
->
id
.
idStr
,
pStatus
,
rsp
.
reqId
,
rsp
.
upstreamTaskId
,
rsp
.
upstreamNodeId
,
rsp
.
status
);
}
else
{
rsp
.
status
=
0
;
tqDebug
(
"tq recv task check(taskId:0x%x not built yet) req(reqId:0x%"
PRIx64
") from task:0x%x (vgId:%d), rsp status %d"
,
...
...
source/dnode/vnode/src/tq/tqRestore.c
浏览文件 @
0bd25489
...
...
@@ -251,19 +251,18 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
int32_t
status
=
pTask
->
status
.
taskStatus
;
// non-source or fill-history tasks don't need to response the WAL scan action.
if
(
pTask
->
info
.
taskLevel
!=
TASK_LEVEL__SOURCE
)
{
if
(
(
pTask
->
info
.
taskLevel
!=
TASK_LEVEL__SOURCE
)
||
(
pTask
->
status
.
downstreamReady
==
0
)
)
{
streamMetaReleaseTask
(
pStreamMeta
,
pTask
);
continue
;
}
if
(
status
!=
TASK_STATUS__NORMAL
/* && status != TASK_STATUS__SCAN_HISTORY_WAL*/
)
{
if
(
status
!=
TASK_STATUS__NORMAL
)
{
tqDebug
(
"s-task:%s not ready for new submit block from wal, status:%s"
,
pTask
->
id
.
idStr
,
streamGetTaskStatusStr
(
status
));
streamMetaReleaseTask
(
pStreamMeta
,
pTask
);
continue
;
}
if
((
pTask
->
info
.
fillHistory
==
1
)
&&
pTask
->
status
.
transferState
)
{
// ASSERT(status == TASK_STATUS__SCAN_HISTORY_WAL);
ASSERT
(
status
==
TASK_STATUS__NORMAL
);
// the maximum version of data in the WAL has reached already, the step2 is done
tqDebug
(
"s-task:%s fill-history reach the maximum ver:%"
PRId64
", not scan wal anymore"
,
pTask
->
id
.
idStr
,
...
...
source/libs/executor/src/executor.c
浏览文件 @
0bd25489
...
...
@@ -341,6 +341,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
return
NULL
;
}
qResetStreamInfoTimeWindow
(
pTaskInfo
);
return
pTaskInfo
;
}
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
0bd25489
...
...
@@ -1550,10 +1550,88 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
}
}
static
int32_t
setBlockIntoRes
(
SStreamScanInfo
*
pInfo
,
const
SSDataBlock
*
pBlock
,
bool
filter
)
{
static
void
doBlockDataWindowFilter
(
SSDataBlock
*
pBlock
,
int32_t
tsIndex
,
STimeWindow
*
pWindow
,
const
char
*
id
)
{
if
(
pWindow
->
skey
!=
INT64_MIN
||
pWindow
->
ekey
!=
INT64_MAX
)
{
bool
*
p
=
taosMemoryCalloc
(
pBlock
->
info
.
rows
,
sizeof
(
bool
));
bool
hasUnqualified
=
false
;
SColumnInfoData
*
pCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
tsIndex
);
if
(
pWindow
->
skey
!=
INT64_MIN
)
{
qDebug
(
"%s filter for additional history window, skey:%"
PRId64
,
id
,
pWindow
->
skey
);
ASSERT
(
pCol
->
pData
!=
NULL
);
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
int64_t
*
ts
=
(
int64_t
*
)
colDataGetData
(
pCol
,
i
);
printf
(
"-------------------%"
PRId64
"
\n
"
,
*
ts
);
p
[
i
]
=
(
*
ts
>=
pWindow
->
skey
);
if
(
!
p
[
i
])
{
hasUnqualified
=
true
;
}
}
}
else
if
(
pWindow
->
ekey
!=
INT64_MAX
)
{
qDebug
(
"%s filter for additional history window, ekey:%"
PRId64
,
id
,
pWindow
->
ekey
);
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
int64_t
*
ts
=
(
int64_t
*
)
colDataGetData
(
pCol
,
i
);
p
[
i
]
=
(
*
ts
<=
pWindow
->
ekey
);
if
(
!
p
[
i
])
{
hasUnqualified
=
true
;
}
}
}
if
(
hasUnqualified
)
{
trimDataBlock
(
pBlock
,
pBlock
->
info
.
rows
,
p
);
}
taosMemoryFree
(
p
);
}
}
// re-build the delete block, ONLY according to the split timestamp
static
void
rebuildDeleteBlockData
(
SSDataBlock
*
pBlock
,
int64_t
skey
,
const
char
*
id
)
{
if
(
skey
==
INT64_MIN
)
{
return
;
}
int32_t
numOfRows
=
pBlock
->
info
.
rows
;
bool
*
p
=
taosMemoryCalloc
(
numOfRows
,
sizeof
(
bool
));
bool
hasUnqualified
=
false
;
SColumnInfoData
*
pSrcStartCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
uint64_t
*
tsStartCol
=
(
uint64_t
*
)
pSrcStartCol
->
pData
;
SColumnInfoData
*
pSrcEndCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
uint64_t
*
tsEndCol
=
(
uint64_t
*
)
pSrcEndCol
->
pData
;
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
i
++
)
{
if
(
tsStartCol
[
i
]
<
skey
)
{
tsStartCol
[
i
]
=
skey
;
}
if
(
tsEndCol
[
i
]
>=
skey
)
{
p
[
i
]
=
true
;
}
else
{
// this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX]
hasUnqualified
=
true
;
}
}
if
(
hasUnqualified
)
{
trimDataBlock
(
pBlock
,
pBlock
->
info
.
rows
,
p
);
}
qDebug
(
"%s re-build delete datablock, start key revised to:%"
PRId64
", rows:%"
PRId64
,
id
,
skey
,
pBlock
->
info
.
rows
);
taosMemoryFree
(
p
);
}
static
int32_t
setBlockIntoRes
(
SStreamScanInfo
*
pInfo
,
const
SSDataBlock
*
pBlock
,
STimeWindow
*
pTimeWindow
,
bool
filter
)
{
SDataBlockInfo
*
pBlockInfo
=
&
pInfo
->
pRes
->
info
;
SOperatorInfo
*
pOperator
=
pInfo
->
pStreamScanOp
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
const
char
*
id
=
GET_TASKID
(
pTaskInfo
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pBlock
->
info
.
rows
);
...
...
@@ -1593,7 +1671,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
// currently only the tbname pseudo column
if
(
pInfo
->
numOfPseudoExpr
>
0
)
{
int32_t
code
=
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pInfo
->
pPseudoExpr
,
pInfo
->
numOfPseudoExpr
,
pInfo
->
pRes
,
pBlockInfo
->
rows
,
GET_TASKID
(
pTaskInfo
)
,
&
pTableScanInfo
->
base
.
metaCache
);
pBlockInfo
->
rows
,
id
,
&
pTableScanInfo
->
base
.
metaCache
);
// ignore the table not exists error, since this table may have been dropped during the scan procedure.
if
(
code
!=
TSDB_CODE_SUCCESS
&&
code
!=
TSDB_CODE_PAR_TABLE_NOT_EXIST
)
{
blockDataFreeRes
((
SSDataBlock
*
)
pBlock
);
...
...
@@ -1608,8 +1686,14 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
doFilter
(
pInfo
->
pRes
,
pOperator
->
exprSupp
.
pFilterInfo
,
NULL
);
}
pInfo
->
pRes
->
info
.
dataLoad
=
1
;
// filter the block extracted from WAL files, according to the time window apply additional time window filter
doBlockDataWindowFilter
(
pInfo
->
pRes
,
pInfo
->
primaryTsIndex
,
pTimeWindow
,
id
);
blockDataUpdateTsWindow
(
pInfo
->
pRes
,
pInfo
->
primaryTsIndex
);
pInfo
->
pRes
->
info
.
dataLoad
=
1
;
if
(
pInfo
->
pRes
->
info
.
rows
==
0
)
{
return
0
;
}
calBlockTbName
(
pInfo
,
pInfo
->
pRes
);
return
0
;
...
...
@@ -1666,7 +1750,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
qDebug
(
"doQueueScan get data from log %"
PRId64
" rows, version:%"
PRId64
,
pRes
->
info
.
rows
,
pTaskInfo
->
streamInfo
.
currentOffset
.
version
);
blockDataCleanup
(
pInfo
->
pRes
);
setBlockIntoRes
(
pInfo
,
pRes
,
true
);
STimeWindow
defaultWindow
=
{.
skey
=
INT64_MIN
,
.
ekey
=
INT64_MAX
};
setBlockIntoRes
(
pInfo
,
pRes
,
&
defaultWindow
,
true
);
if
(
pInfo
->
pRes
->
info
.
rows
>
0
)
{
return
pInfo
->
pRes
;
}
...
...
@@ -1775,80 +1860,6 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
}
}
static
void
doBlockDataWindowFilter
(
SSDataBlock
*
pBlock
,
int32_t
tsIndex
,
STimeWindow
*
pWindow
,
const
char
*
id
)
{
if
(
pWindow
->
skey
!=
INT64_MIN
||
pWindow
->
ekey
!=
INT64_MAX
)
{
bool
*
p
=
taosMemoryCalloc
(
pBlock
->
info
.
rows
,
sizeof
(
bool
));
bool
hasUnqualified
=
false
;
SColumnInfoData
*
pCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
tsIndex
);
if
(
pWindow
->
skey
!=
INT64_MIN
)
{
qDebug
(
"%s filter for additional history window, skey:%"
PRId64
,
id
,
pWindow
->
skey
);
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
int64_t
*
ts
=
(
int64_t
*
)
colDataGetData
(
pCol
,
i
);
p
[
i
]
=
(
*
ts
>=
pWindow
->
skey
);
if
(
!
p
[
i
])
{
hasUnqualified
=
true
;
}
}
}
else
if
(
pWindow
->
ekey
!=
INT64_MAX
)
{
qDebug
(
"%s filter for additional history window, ekey:%"
PRId64
,
id
,
pWindow
->
ekey
);
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
int64_t
*
ts
=
(
int64_t
*
)
colDataGetData
(
pCol
,
i
);
p
[
i
]
=
(
*
ts
<=
pWindow
->
ekey
);
if
(
!
p
[
i
])
{
hasUnqualified
=
true
;
}
}
}
if
(
hasUnqualified
)
{
trimDataBlock
(
pBlock
,
pBlock
->
info
.
rows
,
p
);
}
taosMemoryFree
(
p
);
}
}
// re-build the delete block, ONLY according to the split timestamp
static
void
rebuildDeleteBlockData
(
SSDataBlock
*
pBlock
,
int64_t
skey
,
const
char
*
id
)
{
if
(
skey
==
INT64_MIN
)
{
return
;
}
int32_t
numOfRows
=
pBlock
->
info
.
rows
;
bool
*
p
=
taosMemoryCalloc
(
numOfRows
,
sizeof
(
bool
));
bool
hasUnqualified
=
false
;
SColumnInfoData
*
pSrcStartCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
START_TS_COLUMN_INDEX
);
uint64_t
*
tsStartCol
=
(
uint64_t
*
)
pSrcStartCol
->
pData
;
SColumnInfoData
*
pSrcEndCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
END_TS_COLUMN_INDEX
);
uint64_t
*
tsEndCol
=
(
uint64_t
*
)
pSrcEndCol
->
pData
;
for
(
int32_t
i
=
0
;
i
<
numOfRows
;
i
++
)
{
if
(
tsStartCol
[
i
]
<
skey
)
{
tsStartCol
[
i
]
=
skey
;
}
if
(
tsEndCol
[
i
]
>=
skey
)
{
p
[
i
]
=
true
;
}
else
{
// this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX]
hasUnqualified
=
true
;
}
}
if
(
hasUnqualified
)
{
trimDataBlock
(
pBlock
,
pBlock
->
info
.
rows
,
p
);
}
qDebug
(
"%s re-build delete datablock, start key revised to:%"
PRId64
", rows:%"
PRId64
,
id
,
skey
,
pBlock
->
info
.
rows
);
taosMemoryFree
(
p
);
}
static
SSDataBlock
*
doStreamScan
(
SOperatorInfo
*
pOperator
)
{
// NOTE: this operator does never check if current status is done or not
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
...
...
@@ -2158,15 +2169,11 @@ FETCH_NEXT_BLOCK:
continue
;
}
// filter the block extracted from WAL files, according to the time window
// apply additional time window filter
doBlockDataWindowFilter
(
pRes
,
pInfo
->
primaryTsIndex
,
&
pStreamInfo
->
fillHistoryWindow
,
id
);
blockDataUpdateTsWindow
(
pInfo
->
pRes
,
pInfo
->
primaryTsIndex
);
if
(
pRes
->
info
.
rows
==
0
)
{
setBlockIntoRes
(
pInfo
,
pRes
,
&
pStreamInfo
->
fillHistoryWindow
,
false
);
if
(
pInfo
->
pRes
->
info
.
rows
==
0
)
{
continue
;
}
setBlockIntoRes
(
pInfo
,
pRes
,
false
);
if
(
pInfo
->
pCreateTbRes
->
info
.
rows
>
0
)
{
pInfo
->
scanMode
=
STREAM_SCAN_FROM_RES
;
qDebug
(
"create table res exists, rows:%"
PRId64
" return from stream scan, %s"
,
pInfo
->
pCreateTbRes
->
info
.
rows
,
id
);
...
...
source/libs/stream/src/stream.c
浏览文件 @
0bd25489
...
...
@@ -379,7 +379,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
return
-
1
;
}
qDebug
(
"s-task:%s
data block enqueue, current(blocks:%d, size:%.2fMiB)
"
,
pTask
->
id
.
idStr
,
total
,
size
);
qDebug
(
"s-task:%s
blockdata enqueue, total in queue:%d, size:%.2fMiB
"
,
pTask
->
id
.
idStr
,
total
,
size
);
int32_t
code
=
taosWriteQitem
(
pTask
->
inputQueue
->
queue
,
pItem
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
destroyStreamDataBlock
((
SStreamDataBlock
*
)
pItem
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录