Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9adc6988
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
9adc6988
编写于
7月 26, 2022
作者:
L
liuyao
提交者:
GitHub
7月 26, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #15410 from taosdata/feature/TD-17811
feat(stream): refector some log
上级
faedfb54
35ddd566
变更
4
显示空白变更内容
内联
并排
Showing
4 changed file
with
28 addition
and
27 deletion
+28
-27
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+2
-2
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-0
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+24
-24
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+1
-1
未找到文件。
source/common/src/tdatablock.c
浏览文件 @
9adc6988
...
...
@@ -1763,9 +1763,9 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
int32_t
colNum
=
taosArrayGetSize
(
pDataBlock
->
pDataBlock
);
int32_t
rows
=
pDataBlock
->
info
.
rows
;
int32_t
len
=
0
;
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
"===stream===%s |block type %d
|child id %d|group id:%"
PRIu64
"| uid:%ld|
\n
"
,
flag
,
len
+=
snprintf
(
dumpBuf
+
len
,
size
-
len
,
"===stream===%s |block type %d
|child id %d|group id:%"
PRIu64
"|uid:%ld|rows:%d
\n
"
,
flag
,
(
int32_t
)
pDataBlock
->
info
.
type
,
pDataBlock
->
info
.
childId
,
pDataBlock
->
info
.
groupId
,
pDataBlock
->
info
.
uid
);
pDataBlock
->
info
.
uid
,
pDataBlock
->
info
.
rows
);
if
(
len
>=
size
-
1
)
return
dumpBuf
;
for
(
int32_t
j
=
0
;
j
<
rows
;
j
++
)
{
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
9adc6988
...
...
@@ -1000,6 +1000,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs,
bool
functionNeedToExecute
(
SqlFunctionCtx
*
pCtx
);
bool
isCloseWindow
(
STimeWindow
*
pWin
,
STimeWindowAggSupp
*
pSup
);
void
appendOneRow
(
SSDataBlock
*
pBlock
,
TSKEY
*
pStartTs
,
TSKEY
*
pEndTs
,
uint64_t
*
pUid
);
void
printDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
);
int32_t
finalizeResultRowIntoResultDataBlock
(
SDiskbasedBuf
*
pBuf
,
SResultRowPosition
*
resultRowPosition
,
SqlFunctionCtx
*
pCtx
,
SExprInfo
*
pExprInfo
,
int32_t
numOfExprs
,
const
int32_t
*
rowCellOffset
,
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
9adc6988
...
...
@@ -2769,7 +2769,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
qDebug
(
"interval status %d %s"
,
pOperator
->
status
,
IS_FINAL_OP
(
pInfo
)
?
"interval
Final"
:
"interval S
emi"
);
qDebug
(
"interval status %d %s"
,
pOperator
->
status
,
IS_FINAL_OP
(
pInfo
)
?
"interval
final"
:
"interval s
emi"
);
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
...
...
@@ -2778,7 +2778,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
if
(
pInfo
->
pPullDataRes
->
info
.
rows
!=
0
)
{
// process the rest of the data
ASSERT
(
IS_FINAL_OP
(
pInfo
));
printDataBlock
(
pInfo
->
pPullDataRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval
Final"
:
"interval S
emi"
);
printDataBlock
(
pInfo
->
pPullDataRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval
final"
:
"interval s
emi"
);
return
pInfo
->
pPullDataRes
;
}
...
...
@@ -2793,20 +2793,20 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
return
NULL
;
}
printDataBlock
(
pInfo
->
binfo
.
pRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval
Final"
:
"interval S
emi"
);
printDataBlock
(
pInfo
->
binfo
.
pRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval
final"
:
"interval s
emi"
);
return
pInfo
->
binfo
.
pRes
;
}
else
{
if
(
!
IS_FINAL_OP
(
pInfo
))
{
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pInfo
->
binfo
.
pRes
->
info
.
rows
!=
0
)
{
printDataBlock
(
pInfo
->
binfo
.
pRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval
Final"
:
"interval S
emi"
);
printDataBlock
(
pInfo
->
binfo
.
pRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval
final"
:
"interval s
emi"
);
return
pInfo
->
binfo
.
pRes
;
}
}
if
(
pInfo
->
pUpdateRes
->
info
.
rows
!=
0
&&
pInfo
->
returnUpdate
)
{
pInfo
->
returnUpdate
=
false
;
ASSERT
(
!
IS_FINAL_OP
(
pInfo
));
printDataBlock
(
pInfo
->
pUpdateRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval
Final"
:
"interval S
emi"
);
printDataBlock
(
pInfo
->
pUpdateRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval
final"
:
"interval s
emi"
);
// process the rest of the data
return
pInfo
->
pUpdateRes
;
}
...
...
@@ -2814,13 +2814,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
// if (pInfo->pPullDataRes->info.rows != 0) {
// // process the rest of the data
// ASSERT(IS_FINAL_OP(pInfo));
// printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval
Final" : "interval S
emi");
// printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval
final" : "interval s
emi");
// return pInfo->pPullDataRes;
// }
doBuildDeleteResult
(
pInfo
->
pDelWins
,
&
pInfo
->
delIndex
,
pInfo
->
pDelRes
);
if
(
pInfo
->
pDelRes
->
info
.
rows
!=
0
)
{
// process the rest of the data
printDataBlock
(
pInfo
->
pDelRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval
Final"
:
"interval S
emi"
);
printDataBlock
(
pInfo
->
pDelRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval
final"
:
"interval s
emi"
);
return
pInfo
->
pDelRes
;
}
}
...
...
@@ -2831,10 +2831,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
clearSpecialDataBlock
(
pInfo
->
pUpdateRes
);
removeDeleteResults
(
pUpdated
,
pInfo
->
pDelWins
);
pOperator
->
status
=
OP_RES_TO_RETURN
;
qDebug
(
"%s return data"
,
IS_FINAL_OP
(
pInfo
)
?
"interval
Final"
:
"interval S
emi"
);
qDebug
(
"%s return data"
,
IS_FINAL_OP
(
pInfo
)
?
"interval
final"
:
"interval s
emi"
);
break
;
}
printDataBlock
(
pBlock
,
IS_FINAL_OP
(
pInfo
)
?
"interval
Final recv"
:
"interval S
emi recv"
);
printDataBlock
(
pBlock
,
IS_FINAL_OP
(
pInfo
)
?
"interval
final recv"
:
"interval s
emi recv"
);
maxTs
=
TMAX
(
maxTs
,
pBlock
->
info
.
window
.
ekey
);
if
(
pBlock
->
info
.
type
==
STREAM_NORMAL
||
pBlock
->
info
.
type
==
STREAM_PULL_DATA
||
...
...
@@ -2934,20 +2934,20 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
if
(
pInfo
->
pPullDataRes
->
info
.
rows
!=
0
)
{
// process the rest of the data
ASSERT
(
IS_FINAL_OP
(
pInfo
));
printDataBlock
(
pInfo
->
pPullDataRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval
Final"
:
"interval S
emi"
);
printDataBlock
(
pInfo
->
pPullDataRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval
final"
:
"interval s
emi"
);
return
pInfo
->
pPullDataRes
;
}
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultBuf
);
if
(
pInfo
->
binfo
.
pRes
->
info
.
rows
!=
0
)
{
printDataBlock
(
pInfo
->
binfo
.
pRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval
Final"
:
"interval S
emi"
);
printDataBlock
(
pInfo
->
binfo
.
pRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval
final"
:
"interval s
emi"
);
return
pInfo
->
binfo
.
pRes
;
}
if
(
pInfo
->
pUpdateRes
->
info
.
rows
!=
0
&&
pInfo
->
returnUpdate
)
{
pInfo
->
returnUpdate
=
false
;
ASSERT
(
!
IS_FINAL_OP
(
pInfo
));
printDataBlock
(
pInfo
->
pUpdateRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval
Final"
:
"interval S
emi"
);
printDataBlock
(
pInfo
->
pUpdateRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval
final"
:
"interval s
emi"
);
// process the rest of the data
return
pInfo
->
pUpdateRes
;
}
...
...
@@ -2955,7 +2955,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
doBuildDeleteResult
(
pInfo
->
pDelWins
,
&
pInfo
->
delIndex
,
pInfo
->
pDelRes
);
if
(
pInfo
->
pDelRes
->
info
.
rows
!=
0
)
{
// process the rest of the data
printDataBlock
(
pInfo
->
pDelRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval
Final"
:
"interval S
emi"
);
printDataBlock
(
pInfo
->
pDelRes
,
IS_FINAL_OP
(
pInfo
)
?
"interval
final"
:
"interval s
emi"
);
return
pInfo
->
pDelRes
;
}
// ASSERT(false);
...
...
@@ -3815,14 +3815,14 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
}
else
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildDeleteDataBlock
(
pInfo
->
pStDeleted
,
pInfo
->
pDelRes
,
&
pInfo
->
pDelIterator
);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
pDelRes
,
IS_FINAL_OP
(
pInfo
)
?
"
Final Session"
:
"Single S
ession"
);
printDataBlock
(
pInfo
->
pDelRes
,
IS_FINAL_OP
(
pInfo
)
?
"
final session"
:
"single s
ession"
);
return
pInfo
->
pDelRes
;
}
doBuildResultDatablock
(
pOperator
,
pBInfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
streamAggSup
.
pResultBuf
);
if
(
pBInfo
->
pRes
->
info
.
rows
==
0
||
!
hasDataInGroupInfo
(
&
pInfo
->
groupResInfo
))
{
doSetOperatorCompleted
(
pOperator
);
}
printDataBlock
(
pBInfo
->
pRes
,
IS_FINAL_OP
(
pInfo
)
?
"
Final Session"
:
"Single S
ession"
);
printDataBlock
(
pBInfo
->
pRes
,
IS_FINAL_OP
(
pInfo
)
?
"
final session"
:
"single s
ession"
);
return
pBInfo
->
pRes
->
info
.
rows
==
0
?
NULL
:
pBInfo
->
pRes
;
}
...
...
@@ -3835,7 +3835,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if
(
pBlock
==
NULL
)
{
break
;
}
printDataBlock
(
pBlock
,
IS_FINAL_OP
(
pInfo
)
?
"
Final Session Recv"
:
"Single Session R
ecv"
);
printDataBlock
(
pBlock
,
IS_FINAL_OP
(
pInfo
)
?
"
final session recv"
:
"single session r
ecv"
);
if
(
pBlock
->
info
.
type
==
STREAM_CLEAR
)
{
SArray
*
pWins
=
taosArrayInit
(
16
,
sizeof
(
SResultWindowInfo
));
...
...
@@ -3912,11 +3912,11 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
blockDataEnsureCapacity
(
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
);
doBuildDeleteDataBlock
(
pInfo
->
pStDeleted
,
pInfo
->
pDelRes
,
&
pInfo
->
pDelIterator
);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pInfo
->
pDelRes
,
IS_FINAL_OP
(
pInfo
)
?
"
Final Session"
:
"Single S
ession"
);
printDataBlock
(
pInfo
->
pDelRes
,
IS_FINAL_OP
(
pInfo
)
?
"
final session"
:
"single s
ession"
);
return
pInfo
->
pDelRes
;
}
doBuildResultDatablock
(
pOperator
,
&
pInfo
->
binfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
streamAggSup
.
pResultBuf
);
printDataBlock
(
pBInfo
->
pRes
,
IS_FINAL_OP
(
pInfo
)
?
"
Final Session"
:
"Single S
ession"
);
printDataBlock
(
pBInfo
->
pRes
,
IS_FINAL_OP
(
pInfo
)
?
"
final session"
:
"single s
ession"
);
return
pBInfo
->
pRes
->
info
.
rows
==
0
?
NULL
:
pBInfo
->
pRes
;
}
...
...
@@ -3955,21 +3955,21 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
}
else
if
(
pOperator
->
status
==
OP_RES_TO_RETURN
)
{
doBuildResultDatablock
(
pOperator
,
pBInfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
streamAggSup
.
pResultBuf
);
if
(
pBInfo
->
pRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pBInfo
->
pRes
,
"
Semi S
ession"
);
printDataBlock
(
pBInfo
->
pRes
,
"
sems s
ession"
);
return
pBInfo
->
pRes
;
}
// doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
&&
!
pInfo
->
returnDelete
)
{
pInfo
->
returnDelete
=
true
;
printDataBlock
(
pInfo
->
pDelRes
,
"
Semi S
ession"
);
printDataBlock
(
pInfo
->
pDelRes
,
"
sems s
ession"
);
return
pInfo
->
pDelRes
;
}
if
(
pInfo
->
pUpdateRes
->
info
.
rows
>
0
)
{
// process the rest of the data
pOperator
->
status
=
OP_OPENED
;
printDataBlock
(
pInfo
->
pUpdateRes
,
"
Semi S
ession"
);
printDataBlock
(
pInfo
->
pUpdateRes
,
"
sems s
ession"
);
return
pInfo
->
pUpdateRes
;
}
// semi interval operator clear disk buffer
...
...
@@ -4033,21 +4033,21 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
doBuildResultDatablock
(
pOperator
,
pBInfo
,
&
pInfo
->
groupResInfo
,
pInfo
->
streamAggSup
.
pResultBuf
);
if
(
pBInfo
->
pRes
->
info
.
rows
>
0
)
{
printDataBlock
(
pBInfo
->
pRes
,
"
Semi S
ession"
);
printDataBlock
(
pBInfo
->
pRes
,
"
sems s
ession"
);
return
pBInfo
->
pRes
;
}
// doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if
(
pInfo
->
pDelRes
->
info
.
rows
>
0
&&
!
pInfo
->
returnDelete
)
{
pInfo
->
returnDelete
=
true
;
printDataBlock
(
pInfo
->
pDelRes
,
"
Semi S
ession"
);
printDataBlock
(
pInfo
->
pDelRes
,
"
sems s
ession"
);
return
pInfo
->
pDelRes
;
}
if
(
pInfo
->
pUpdateRes
->
info
.
rows
>
0
)
{
// process the rest of the data
pOperator
->
status
=
OP_OPENED
;
printDataBlock
(
pInfo
->
pUpdateRes
,
"
Semi S
ession"
);
printDataBlock
(
pInfo
->
pUpdateRes
,
"
sems s
ession"
);
return
pInfo
->
pUpdateRes
;
}
...
...
tests/script/jenkins/basic.txt
浏览文件 @
9adc6988
...
...
@@ -224,7 +224,7 @@
# ---- stream
./test.sh -f tsim/stream/basic0.sim
#
./test.sh -f tsim/stream/basic1.sim
./test.sh -f tsim/stream/basic1.sim
./test.sh -f tsim/stream/basic2.sim
./test.sh -f tsim/stream/drop_stream.sim
./test.sh -f tsim/stream/distributeInterval0.sim
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录