Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4e9146f0
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4e9146f0
编写于
2月 14, 2023
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix(query): fix memory leak.
上级
a1eafe88
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
62 addition
and
35 deletion
+62
-35
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+1
-1
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+43
-3
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+17
-30
tests/system-test/2-query/unique.py
tests/system-test/2-query/unique.py
+1
-1
未找到文件。
source/libs/executor/inc/executil.h
浏览文件 @
4e9146f0
...
@@ -127,7 +127,7 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo
...
@@ -127,7 +127,7 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo
void
initGroupedResultInfo
(
SGroupResInfo
*
pGroupResInfo
,
SSHashObj
*
pHashmap
,
int32_t
order
);
void
initGroupedResultInfo
(
SGroupResInfo
*
pGroupResInfo
,
SSHashObj
*
pHashmap
,
int32_t
order
);
void
cleanupGroupResInfo
(
SGroupResInfo
*
pGroupResInfo
);
void
cleanupGroupResInfo
(
SGroupResInfo
*
pGroupResInfo
);
void
initMultiResInfoFromArrayList
(
SGroupResInfo
*
pGroupResInfo
,
SArray
*
pArrayList
);
int32_t
initMultiResInfoFromArrayList
(
SGroupResInfo
*
pGroupResInfo
,
SHashObj
*
pResultHash
);
bool
hasRemainResults
(
SGroupResInfo
*
pGroupResInfo
);
bool
hasRemainResults
(
SGroupResInfo
*
pGroupResInfo
);
int32_t
getNumOfTotalRes
(
SGroupResInfo
*
pGroupResInfo
);
int32_t
getNumOfTotalRes
(
SGroupResInfo
*
pGroupResInfo
);
...
...
source/libs/executor/src/executil.c
浏览文件 @
4e9146f0
...
@@ -162,14 +162,54 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in
...
@@ -162,14 +162,54 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in
assert
(
pGroupResInfo
->
index
<=
getNumOfTotalRes
(
pGroupResInfo
));
assert
(
pGroupResInfo
->
index
<=
getNumOfTotalRes
(
pGroupResInfo
));
}
}
void
initMultiResInfoFromArrayList
(
SGroupResInfo
*
pGroupResInfo
,
SArray
*
pArrayList
)
{
int32_t
initMultiResInfoFromArrayList
(
SGroupResInfo
*
pGroupResInfo
,
SHashObj
*
pResultHash
)
{
int32_t
itemSize
=
sizeof
(
SResKeyPos
)
+
sizeof
(
uint64_t
);
int32_t
bufLen
=
taosHashGetSize
(
pResultHash
)
*
itemSize
;
int32_t
offset
=
0
;
void
*
pIter
=
NULL
;
int32_t
numOfRows
=
taosHashGetSize
(
pResultHash
);
if
(
pGroupResInfo
->
pRows
!=
NULL
)
{
if
(
pGroupResInfo
->
pRows
!=
NULL
)
{
taosArrayDestroyP
(
pGroupResInfo
->
pRows
,
taosMemoryFree
);
taosArrayClear
(
pGroupResInfo
->
pRows
);
}
else
{
pGroupResInfo
->
pRows
=
taosArrayInit
(
numOfRows
,
sizeof
(
void
*
));
}
if
(
numOfRows
==
0
)
{
pGroupResInfo
->
index
=
0
;
return
TSDB_CODE_SUCCESS
;
}
if
(
pGroupResInfo
->
pBuf
==
NULL
)
{
pGroupResInfo
->
pBuf
=
taosMemoryMalloc
(
bufLen
);
if
(
pGroupResInfo
->
pBuf
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
}
else
{
char
*
p
=
taosMemoryRealloc
(
pGroupResInfo
->
pBuf
,
bufLen
);
if
(
p
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pGroupResInfo
->
pBuf
=
p
;
}
}
pGroupResInfo
->
pRows
=
pArrayList
;
while
((
pIter
=
taosHashIterate
(
pResultHash
,
pIter
))
!=
NULL
)
{
SResKeyPos
*
p
=
(
SResKeyPos
*
)
(
pGroupResInfo
->
pBuf
+
offset
);
SResKeyPos
*
p1
=
pIter
;
qDebug
(
"key:%"
PRId64
", gid:%"
PRId64
,
*
(
uint64_t
*
)
p1
->
key
,
p1
->
groupId
);
memcpy
(
p
,
p1
,
itemSize
);
taosArrayPush
(
pGroupResInfo
->
pRows
,
&
p
);
offset
+=
itemSize
;
}
taosSort
(
pGroupResInfo
->
pRows
->
pData
,
taosArrayGetSize
(
pGroupResInfo
->
pRows
),
sizeof
(
void
*
),
resultrowComparAsc
);
pGroupResInfo
->
index
=
0
;
pGroupResInfo
->
index
=
0
;
ASSERT
(
pGroupResInfo
->
index
<=
getNumOfTotalRes
(
pGroupResInfo
));
ASSERT
(
pGroupResInfo
->
index
<=
getNumOfTotalRes
(
pGroupResInfo
));
return
TSDB_CODE_SUCCESS
;
}
}
bool
hasRemainResults
(
SGroupResInfo
*
pGroupResInfo
)
{
bool
hasRemainResults
(
SGroupResInfo
*
pGroupResInfo
)
{
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
4e9146f0
...
@@ -843,19 +843,15 @@ static int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) {
...
@@ -843,19 +843,15 @@ static int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) {
}
}
static
int32_t
saveWinResult
(
int64_t
ts
,
int32_t
pageId
,
int32_t
offset
,
uint64_t
groupId
,
SHashObj
*
pUpdatedMap
)
{
static
int32_t
saveWinResult
(
int64_t
ts
,
int32_t
pageId
,
int32_t
offset
,
uint64_t
groupId
,
SHashObj
*
pUpdatedMap
)
{
SResKeyPos
*
newPos
=
taosMemoryMalloc
(
sizeof
(
SResKeyPos
)
+
sizeof
(
uint64_t
));
char
buf
[
sizeof
(
SResKeyPos
)
+
sizeof
(
uint64_t
)]
=
{
0
};
if
(
newPos
==
NULL
)
{
SResKeyPos
*
pResPos
=
(
SResKeyPos
*
)
buf
;
return
TSDB_CODE_OUT_OF_MEMORY
;
}
newPos
->
groupId
=
groupId
;
*
(
int64_t
*
)
pResPos
->
key
=
ts
;
newPos
->
pos
=
(
SResultRowPosition
){.
pageId
=
pageId
,
.
offset
=
offset
};
pResPos
->
groupId
=
groupId
;
*
(
int64_t
*
)
newPos
->
key
=
ts
;
pResPos
->
pos
=
(
SResultRowPosition
){.
pageId
=
pageId
,
.
offset
=
offset
};
SWinKey
key
=
{.
ts
=
ts
,
.
groupId
=
groupId
};
if
(
taosHashPut
(
pUpdatedMap
,
&
key
,
sizeof
(
SWinKey
),
&
newPos
,
sizeof
(
void
*
))
!=
TSDB_CODE_SUCCESS
)
{
taosMemoryFree
(
newPos
);
}
SWinKey
key
=
{.
ts
=
ts
,
.
groupId
=
groupId
};
taosHashPut
(
pUpdatedMap
,
&
key
,
sizeof
(
SWinKey
),
pResPos
,
sizeof
(
SResKeyPos
)
+
sizeof
(
uint64_t
));
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -2568,7 +2564,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -2568,7 +2564,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
}
}
}
SArray
*
pUpdated
=
taosArrayInit
(
4
,
POINTER_BYTES
);
// SArray* pUpdated = taosArrayInit(4, sizeof(SResKeyPos));
_hash_fn_t
hashFn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
);
_hash_fn_t
hashFn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
);
SHashObj
*
pUpdatedMap
=
taosHashInit
(
1024
,
hashFn
,
false
,
HASH_NO_LOCK
);
SHashObj
*
pUpdatedMap
=
taosHashInit
(
1024
,
hashFn
,
false
,
HASH_NO_LOCK
);
while
(
1
)
{
while
(
1
)
{
...
@@ -2610,9 +2607,9 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -2610,9 +2607,9 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
continue
;
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_RETRIEVE
&&
!
IS_FINAL_OP
(
pInfo
))
{
}
else
if
(
pBlock
->
info
.
type
==
STREAM_RETRIEVE
&&
!
IS_FINAL_OP
(
pInfo
))
{
doDeleteWindows
(
pOperator
,
&
pInfo
->
interval
,
pBlock
,
NULL
,
pUpdatedMap
);
doDeleteWindows
(
pOperator
,
&
pInfo
->
interval
,
pBlock
,
NULL
,
pUpdatedMap
);
if
(
taosArrayGetSize
(
pUpdated
)
>
0
)
{
//
if (taosArrayGetSize(pUpdated) > 0) {
break
;
//
break;
}
//
}
continue
;
continue
;
}
else
if
(
pBlock
->
info
.
type
==
STREAM_PULL_OVER
&&
IS_FINAL_OP
(
pInfo
))
{
}
else
if
(
pBlock
->
info
.
type
==
STREAM_PULL_OVER
&&
IS_FINAL_OP
(
pInfo
))
{
processPullOver
(
pBlock
,
pInfo
->
pPullDataMap
,
&
pInfo
->
interval
);
processPullOver
(
pBlock
,
pInfo
->
pPullDataMap
,
&
pInfo
->
interval
);
...
@@ -2659,14 +2656,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -2659,14 +2656,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
}
pInfo
->
binfo
.
pRes
->
info
.
watermark
=
pInfo
->
twAggSup
.
maxTs
;
pInfo
->
binfo
.
pRes
->
info
.
watermark
=
pInfo
->
twAggSup
.
maxTs
;
void
*
pIte
=
NULL
;
// todo
while
((
pIte
=
taosHashIterate
(
pUpdatedMap
,
pIte
))
!=
NULL
)
{
int32_t
code
=
initMultiResInfoFromArrayList
(
&
pInfo
->
groupResInfo
,
pUpdatedMap
);
taosArrayPush
(
pUpdated
,
pIte
);
}
taosHashCleanup
(
pUpdatedMap
);
taosArraySort
(
pUpdated
,
resultrowComparAsc
);
initMultiResInfoFromArrayList
(
&
pInfo
->
groupResInfo
,
pUpdated
);
taosHashCleanup
(
pUpdatedMap
);
blockDataEnsureCapacity
(
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
);
blockDataEnsureCapacity
(
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
);
doBuildPullDataBlock
(
pInfo
->
pPullWins
,
&
pInfo
->
pullIndex
,
pInfo
->
pPullDataRes
);
doBuildPullDataBlock
(
pInfo
->
pPullWins
,
&
pInfo
->
pullIndex
,
pInfo
->
pPullDataRes
);
...
@@ -4755,7 +4748,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -4755,7 +4748,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
SArray
*
pUpdated
=
taosArrayInit
(
4
,
POINTER_BYTES
);
// SResKeyPos
_hash_fn_t
hashFn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
);
_hash_fn_t
hashFn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
);
SHashObj
*
pUpdatedMap
=
taosHashInit
(
1024
,
hashFn
,
false
,
HASH_NO_LOCK
);
SHashObj
*
pUpdatedMap
=
taosHashInit
(
1024
,
hashFn
,
false
,
HASH_NO_LOCK
);
...
@@ -4808,13 +4800,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -4808,13 +4800,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
closeStreamIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
&
pInfo
->
twAggSup
,
&
pInfo
->
interval
,
NULL
,
pUpdatedMap
,
closeStreamIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
&
pInfo
->
twAggSup
,
&
pInfo
->
interval
,
NULL
,
pUpdatedMap
,
pInfo
->
pDelWins
,
pOperator
);
pInfo
->
pDelWins
,
pOperator
);
void
*
pIte
=
NULL
;
// todo
while
((
pIte
=
taosHashIterate
(
pUpdatedMap
,
pIte
))
!=
NULL
)
{
int32_t
code
=
initMultiResInfoFromArrayList
(
&
pInfo
->
groupResInfo
,
pUpdatedMap
);
taosArrayPush
(
pUpdated
,
pIte
);
}
taosArraySort
(
pUpdated
,
resultrowComparAsc
);
initMultiResInfoFromArrayList
(
&
pInfo
->
groupResInfo
,
pUpdated
);
blockDataEnsureCapacity
(
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
);
blockDataEnsureCapacity
(
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
);
taosHashCleanup
(
pUpdatedMap
);
taosHashCleanup
(
pUpdatedMap
);
...
...
tests/system-test/2-query/unique.py
浏览文件 @
4e9146f0
...
@@ -433,7 +433,7 @@ class TDTestCase:
...
@@ -433,7 +433,7 @@ class TDTestCase:
tdSql
.
checkRows
(
11
)
tdSql
.
checkRows
(
11
)
tdSql
.
checkData
(
1
,
0
,
0
)
tdSql
.
checkData
(
1
,
0
,
0
)
tdSql
.
checkData
(
10
,
0
,
9
)
tdSql
.
checkData
(
10
,
0
,
9
)
tdSql
.
query
(
f
"select unique(t1)
from (select _rowts , t1 , tbname from
{
dbname
}
.stb1 )
"
)
tdSql
.
query
(
f
"select unique(t1)
v from (select _rowts , t1 , tbname from
{
dbname
}
.stb1 ) order by v desc
"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
0
,
4
)
tdSql
.
checkData
(
0
,
0
,
4
)
tdSql
.
checkData
(
1
,
0
,
1
)
tdSql
.
checkData
(
1
,
0
,
1
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录