Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
5244e6de
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
5244e6de
编写于
9月 21, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
9月 21, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #16980 from taosdata/feature/TD-18820
fix(stream): free array
上级
de164f81
5ca1dc10
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
6 addition
and
35 deletion
+6
-35
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+1
-0
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+5
-35
未找到文件。
source/libs/executor/src/scanoperator.c
浏览文件 @
5244e6de
...
...
@@ -1687,6 +1687,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
while
(
1
)
{
if
(
pInfo
->
tqReader
->
pMsg
==
NULL
)
{
if
(
pInfo
->
validBlockIndex
>=
totBlockNum
)
{
updateInfoDestoryColseWinSBF
(
pInfo
->
pUpdateInfo
);
return
NULL
;
}
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
5244e6de
...
...
@@ -1695,6 +1695,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
}
nodesDestroyNode
((
SNode
*
)
pInfo
->
pPhyNode
);
colDataDestroy
(
&
pInfo
->
twAggSup
.
timeWindowData
);
cleanupGroupResInfo
(
&
pInfo
->
groupResInfo
);
taosMemoryFreeClear
(
param
);
}
...
...
@@ -3073,6 +3074,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) {
taosArrayRemove
(
chArray
,
index
);
if
(
taosArrayGetSize
(
chArray
)
==
0
)
{
// pull data is over
taosArrayDestroy
(
chArray
);
taosHashRemove
(
pMap
,
&
winRes
,
sizeof
(
SWinKey
));
}
}
...
...
@@ -3109,9 +3111,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo
*
pInfo
=
pOperator
->
info
;
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
SArray
*
pUpdated
=
taosArrayInit
(
4
,
POINTER_BYTES
);
_hash_fn_t
hashFn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
);
SHashObj
*
pUpdatedMap
=
taosHashInit
(
1024
,
hashFn
,
false
,
HASH_NO_LOCK
);
TSKEY
maxTs
=
INT64_MIN
;
TSKEY
minTs
=
INT64_MAX
;
...
...
@@ -3175,6 +3174,9 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
}
SArray
*
pUpdated
=
taosArrayInit
(
4
,
POINTER_BYTES
);
_hash_fn_t
hashFn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
);
SHashObj
*
pUpdatedMap
=
taosHashInit
(
1024
,
hashFn
,
false
,
HASH_NO_LOCK
);
while
(
1
)
{
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
if
(
pBlock
==
NULL
)
{
...
...
@@ -5755,8 +5757,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
_hash_fn_t
hashFn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
);
SHashObj
*
pUpdatedMap
=
taosHashInit
(
1024
,
hashFn
,
false
,
HASH_NO_LOCK
);
SStreamState
*
pState
=
pTaskInfo
->
streamInfo
.
pState
;
while
(
1
)
{
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
if
(
pBlock
==
NULL
)
{
...
...
@@ -5805,36 +5805,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
}
pInfo
->
twAggSup
.
maxTs
=
TMAX
(
pInfo
->
twAggSup
.
maxTs
,
maxTs
);
pInfo
->
twAggSup
.
minTs
=
TMIN
(
pInfo
->
twAggSup
.
minTs
,
minTs
);
#if 0
if (pState) {
printf(">>>>>>>> stream read backend\n");
SWinKey key = {
.ts = 1,
.groupId = 2,
};
char* val = NULL;
int32_t sz;
if (streamStateGet(pState, &key, (void**)&val, &sz) < 0) {
ASSERT(0);
}
printf("stream read %s %d\n", val, sz);
streamFreeVal(val);
SStreamStateCur* pCur = streamStateGetCur(pState, &key);
ASSERT(pCur);
while (streamStateCurNext(pState, pCur) == 0) {
SWinKey key1;
const void* val1;
if (streamStateGetKVByCur(pCur, &key1, &val1, &sz) < 0) {
break;
}
printf("stream iter key groupId:%d ts:%d, value %s %d\n", key1.groupId, key1.ts, val1, sz);
}
streamStateFreeCur(pCur);
}
#endif
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeStreamIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
&
pInfo
->
twAggSup
,
&
pInfo
->
interval
,
NULL
,
pUpdatedMap
,
pOperator
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录