Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
5dff136b
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
5dff136b
编写于
7月 09, 2022
作者:
H
Haojun Liao
提交者:
GitHub
7月 09, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14689 from taosdata/feature/3_liaohj
fix(query): fix memory leak.
上级
ebc9e438
fceee026
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
81 addition
and
46 deletion
+81
-46
include/libs/function/function.h
include/libs/function/function.h
+6
-0
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+12
-3
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+8
-5
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+2
-1
source/libs/scalar/inc/sclInt.h
source/libs/scalar/inc/sclInt.h
+1
-1
source/libs/scalar/src/filter.c
source/libs/scalar/src/filter.c
+9
-2
source/libs/scalar/src/scalar.c
source/libs/scalar/src/scalar.c
+37
-26
source/libs/scalar/src/sclvector.c
source/libs/scalar/src/sclvector.c
+6
-8
未找到文件。
include/libs/function/function.h
浏览文件 @
5dff136b
...
...
@@ -172,7 +172,13 @@ typedef struct tExprNode {
void
tExprTreeDestroy
(
tExprNode
*
pNode
,
void
(
*
fp
)(
void
*
));
typedef
enum
{
SHOULD_FREE_COLDATA
=
0x1
,
// the newly created column data needs to be destroyed.
DELEGATED_MGMT_COLDATA
=
0x2
,
// input column data should not be released.
}
ECOLDATA_MGMT_TYPE_E
;
struct
SScalarParam
{
ECOLDATA_MGMT_TYPE_E
type
;
SColumnInfoData
*
columnData
;
SHashObj
*
pHashFilter
;
int32_t
hashValueType
;
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
5dff136b
...
...
@@ -2182,12 +2182,21 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret
return
VND_TSDB
(
pVnode
);
}
static
SVersionRange
getQueryVerRange
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
int8_t
level
)
{
SVersionRange
getQueryVerRange
(
SVnode
*
pVnode
,
SQueryTableDataCond
*
pCond
,
int8_t
level
)
{
int64_t
startVer
=
(
pCond
->
startVersion
==
-
1
)
?
0
:
pCond
->
startVersion
;
if
(
VND_IS_RSMA
(
pVnode
))
{
return
(
SVersionRange
){.
minVer
=
pCond
->
startVersion
,
.
maxVer
=
tdRSmaGetMaxSubmitVer
(
pVnode
->
pSma
,
level
)};
return
(
SVersionRange
){.
minVer
=
startVer
,
.
maxVer
=
tdRSmaGetMaxSubmitVer
(
pVnode
->
pSma
,
level
)};
}
int64_t
endVer
=
0
;
if
(
pCond
->
endVersion
==
-
1
)
{
// user not specified end version, set current maximum version of vnode as the endVersion
endVer
=
pVnode
->
state
.
applied
;
}
else
{
endVer
=
(
pCond
->
endVersion
>
pVnode
->
state
.
applied
)
?
pVnode
->
state
.
applied
:
pCond
->
endVersion
;
}
return
(
SVersionRange
){.
minVer
=
pCond
->
startVersion
,
.
maxVer
=
pVnode
->
state
.
applied
};
return
(
SVersionRange
){.
minVer
=
startVer
,
.
maxVer
=
endVer
};
}
// // todo not unref yet, since it is not support multi-group interpolation query
...
...
source/libs/executor/src/executil.c
浏览文件 @
5dff136b
...
...
@@ -748,11 +748,12 @@ SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
SColumn
extractColumnFromColumnNode
(
SColumnNode
*
pColNode
)
{
SColumn
c
=
{
0
};
c
.
slotId
=
pColNode
->
slotId
;
c
.
colId
=
pColNode
->
colId
;
c
.
type
=
pColNode
->
node
.
resType
.
type
;
c
.
bytes
=
pColNode
->
node
.
resType
.
bytes
;
c
.
scale
=
pColNode
->
node
.
resType
.
scale
;
c
.
slotId
=
pColNode
->
slotId
;
c
.
colId
=
pColNode
->
colId
;
c
.
type
=
pColNode
->
node
.
resType
.
type
;
c
.
bytes
=
pColNode
->
node
.
resType
.
bytes
;
c
.
scale
=
pColNode
->
node
.
resType
.
scale
;
c
.
precision
=
pColNode
->
node
.
resType
.
precision
;
return
c
;
}
...
...
@@ -774,6 +775,8 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
pCond
->
suid
=
pTableScanNode
->
scan
.
suid
;
pCond
->
type
=
BLOCK_LOAD_OFFSET_ORDER
;
pCond
->
startVersion
=
-
1
;
pCond
->
endVersion
=
-
1
;
// pCond->type = pTableScanNode->scanFlag;
int32_t
j
=
0
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
5dff136b
...
...
@@ -210,7 +210,8 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
bool
allColumnsHaveAgg
=
true
;
SColumnDataAgg
**
pColAgg
=
NULL
;
int32_t
code
=
tsdbRetrieveDatablockSMA
(
pTableScanInfo
->
dataReader
,
&
pColAgg
,
&
allColumnsHaveAgg
);
int32_t
code
=
tsdbRetrieveDatablockSMA
(
pTableScanInfo
->
dataReader
,
&
pColAgg
,
&
allColumnsHaveAgg
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
...
...
source/libs/scalar/inc/sclInt.h
浏览文件 @
5dff136b
...
...
@@ -57,7 +57,7 @@ typedef struct SScalarCtx {
#define SCL_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
int32_t
doConvertDataType
(
SValueNode
*
pValueNode
,
SScalarParam
*
out
);
SColumnInfoData
*
sclCreateColumnInfoData
(
SDataType
*
pType
,
int32_t
numOfRows
);
int32_t
sclCreateColumnInfoData
(
SDataType
*
pType
,
int32_t
numOfRows
,
SScalarParam
*
pParam
);
int32_t
sclConvertToTsValueNode
(
int8_t
precision
,
SValueNode
*
valueNode
);
#define GET_PARAM_TYPE(_c) ((_c)->columnData ? (_c)->columnData->info.type : (_c)->hashValueType)
...
...
source/libs/scalar/src/filter.c
浏览文件 @
5dff136b
...
...
@@ -3827,13 +3827,20 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnData
SScalarParam
output
=
{
0
};
SDataType
type
=
{.
type
=
TSDB_DATA_TYPE_BOOL
,
.
bytes
=
sizeof
(
bool
)};
output
.
columnData
=
sclCreateColumnInfoData
(
&
type
,
pSrc
->
info
.
rows
);
int32_t
code
=
sclCreateColumnInfoData
(
&
type
,
pSrc
->
info
.
rows
,
&
output
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
SArray
*
pList
=
taosArrayInit
(
1
,
POINTER_BYTES
);
taosArrayPush
(
pList
,
&
pSrc
);
FLT_ERR_RET
(
scalarCalculate
(
info
->
sclCtx
.
node
,
pList
,
&
output
));
*
p
=
(
int8_t
*
)
output
.
columnData
->
pData
;
*
p
=
taosMemoryMalloc
(
output
.
numOfRows
*
sizeof
(
bool
));
memcpy
(
*
p
,
output
.
columnData
->
pData
,
output
.
numOfRows
);
colDataDestroy
(
output
.
columnData
);
taosMemoryFree
(
output
.
columnData
);
taosArrayDestroy
(
pList
);
return
false
;
...
...
source/libs/scalar/src/scalar.c
浏览文件 @
5dff136b
...
...
@@ -35,12 +35,11 @@ int32_t sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode) {
return
TSDB_CODE_SUCCESS
;
}
SColumnInfoData
*
sclCreateColumnInfoData
(
SDataType
*
pType
,
int32_t
numOfRows
)
{
int32_t
sclCreateColumnInfoData
(
SDataType
*
pType
,
int32_t
numOfRows
,
SScalarParam
*
pParam
)
{
SColumnInfoData
*
pColumnData
=
taosMemoryCalloc
(
1
,
sizeof
(
SColumnInfoData
));
if
(
pColumnData
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
return
terrno
;
}
pColumnData
->
info
.
type
=
pType
->
type
;
...
...
@@ -52,19 +51,25 @@ SColumnInfoData* sclCreateColumnInfoData(SDataType* pType, int32_t numOfRows) {
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
taosMemoryFree
(
pColumnData
);
return
NULL
;
}
else
{
return
pColumnData
;
return
terrno
;
}
pParam
->
columnData
=
pColumnData
;
pParam
->
type
=
SHOULD_FREE_COLDATA
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
doConvertDataType
(
SValueNode
*
pValueNode
,
SScalarParam
*
out
)
{
SScalarParam
in
=
{.
numOfRows
=
1
};
in
.
columnData
=
sclCreateColumnInfoData
(
&
pValueNode
->
node
.
resType
,
1
);
int32_t
code
=
sclCreateColumnInfoData
(
&
pValueNode
->
node
.
resType
,
1
,
&
in
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
colDataAppend
(
in
.
columnData
,
0
,
nodesGetValueFromNode
(
pValueNode
),
false
);
colInfoDataEnsureCapacity
(
out
->
columnData
,
1
);
int32_t
code
=
vectorConvertImpl
(
&
in
,
out
);
code
=
vectorConvertImpl
(
&
in
,
out
);
sclFreeParam
(
&
in
);
return
code
;
...
...
@@ -157,7 +162,7 @@ void sclFreeRes(SHashObj *res) {
void
sclFreeParam
(
SScalarParam
*
param
)
{
if
(
param
->
columnData
!=
NULL
)
{
colDataDestroy
(
param
->
columnData
);
taosMemoryFree
(
param
->
columnData
);
taosMemoryFree
Clear
(
param
->
columnData
);
}
if
(
param
->
pHashFilter
!=
NULL
)
{
...
...
@@ -190,8 +195,9 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
case
QUERY_NODE_VALUE
:
{
SValueNode
*
valueNode
=
(
SValueNode
*
)
node
;
ASSERT
(
param
->
columnData
==
NULL
);
param
->
numOfRows
=
1
;
param
->
columnData
=
sclCreateColumnInfoData
(
&
valueNode
->
node
.
resType
,
1
);
/*int32_t code = */
sclCreateColumnInfoData
(
&
valueNode
->
node
.
resType
,
1
,
param
);
if
(
TSDB_DATA_TYPE_NULL
==
valueNode
->
node
.
resType
.
type
||
valueNode
->
isNull
)
{
colDataAppendNULL
(
param
->
columnData
,
0
);
}
else
{
...
...
@@ -429,10 +435,9 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp
SCL_ERR_JRET
(
code
);
}
output
->
columnData
=
sclCreateColumnInfoData
(
&
node
->
node
.
resType
,
rowNum
);
if
(
output
->
columnData
==
NULL
)
{
sclError
(
"calloc %d failed"
,
(
int32_t
)(
rowNum
*
output
->
columnData
->
info
.
bytes
));
SCL_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
code
=
sclCreateColumnInfoData
(
&
node
->
node
.
resType
,
rowNum
,
output
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
SCL_ERR_JRET
(
code
);
}
code
=
(
*
ffpSet
.
process
)(
params
,
paramNum
,
output
);
...
...
@@ -482,10 +487,9 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
output
->
numOfRows
=
rowNum
;
SDataType
t
=
{.
type
=
type
,
.
bytes
=
tDataTypes
[
type
].
bytes
};
output
->
columnData
=
sclCreateColumnInfoData
(
&
t
,
rowNum
);
if
(
output
->
columnData
==
NULL
)
{
sclError
(
"calloc %d failed"
,
(
int32_t
)(
rowNum
*
sizeof
(
bool
)));
SCL_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
code
=
sclCreateColumnInfoData
(
&
t
,
rowNum
,
output
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
SCL_ERR_JRET
(
code
);
}
bool
value
=
false
;
...
...
@@ -537,18 +541,19 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp
int32_t
code
=
0
;
// json not support in in operator
if
(
nodeType
(
node
->
pLeft
)
==
QUERY_NODE_VALUE
)
{
if
(
nodeType
(
node
->
pLeft
)
==
QUERY_NODE_VALUE
)
{
SValueNode
*
valueNode
=
(
SValueNode
*
)
node
->
pLeft
;
if
(
valueNode
->
node
.
resType
.
type
==
TSDB_DATA_TYPE_JSON
&&
(
node
->
opType
==
OP_TYPE_IN
||
node
->
opType
==
OP_TYPE_NOT_IN
))
{
if
(
valueNode
->
node
.
resType
.
type
==
TSDB_DATA_TYPE_JSON
&&
(
node
->
opType
==
OP_TYPE_IN
||
node
->
opType
==
OP_TYPE_NOT_IN
))
{
SCL_RET
(
TSDB_CODE_QRY_JSON_IN_ERROR
);
}
}
SCL_ERR_RET
(
sclInitOperatorParams
(
&
params
,
node
,
ctx
,
&
rowNum
));
output
->
columnData
=
sclCreateColumnInfoData
(
&
node
->
node
.
resType
,
rowNum
);
if
(
output
->
columnData
==
NULL
)
{
sclError
(
"calloc failed, size:%d"
,
(
int32_t
)
rowNum
*
node
->
node
.
resType
.
bytes
);
SCL_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
code
=
sclCreateColumnInfoData
(
&
node
->
node
.
resType
,
rowNum
,
output
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
SCL_ERR_JRET
(
code
);
}
}
_bin_scalar_fn_t
OperatorFn
=
getBinScalarOperatorFn
(
node
->
opType
);
...
...
@@ -563,7 +568,10 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp
_return:
for
(
int32_t
i
=
0
;
i
<
paramNum
;
++
i
)
{
// sclFreeParam(¶ms[i]);
if
(
params
[
i
].
type
==
SHOULD_FREE_COLDATA
)
{
colDataDestroy
(
params
[
i
].
columnData
);
taosMemoryFreeClear
(
params
[
i
].
columnData
);
}
}
taosMemoryFreeClear
(
params
);
...
...
@@ -766,7 +774,7 @@ EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) {
return
sclRewriteNonConstOperator
(
pNode
,
ctx
);
}
SScalarParam
output
=
{
.
columnData
=
taosMemoryCalloc
(
1
,
sizeof
(
SColumnInfoData
))
};
SScalarParam
output
=
{
0
};
ctx
->
code
=
sclExecOperator
(
node
,
ctx
,
&
output
);
if
(
ctx
->
code
)
{
return
DEAL_RES_ERROR
;
...
...
@@ -834,6 +842,7 @@ EDealRes sclWalkFunction(SNode* pNode, SScalarCtx *ctx) {
return
DEAL_RES_ERROR
;
}
output
.
type
=
DELEGATED_MGMT_COLDATA
;
if
(
taosHashPut
(
ctx
->
pRes
,
&
pNode
,
POINTER_BYTES
,
&
output
,
sizeof
(
output
)))
{
ctx
->
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
DEAL_RES_ERROR
;
...
...
@@ -868,6 +877,7 @@ EDealRes sclWalkOperator(SNode* pNode, SScalarCtx *ctx) {
return
DEAL_RES_ERROR
;
}
output
.
type
=
DELEGATED_MGMT_COLDATA
;
if
(
taosHashPut
(
ctx
->
pRes
,
&
pNode
,
POINTER_BYTES
,
&
output
,
sizeof
(
output
)))
{
ctx
->
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
DEAL_RES_ERROR
;
...
...
@@ -1026,7 +1036,8 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
colDataAssign
(
pDst
->
columnData
,
res
->
columnData
,
res
->
numOfRows
,
NULL
);
pDst
->
numOfRows
=
res
->
numOfRows
;
}
sclFreeParam
(
res
);
taosHashRemove
(
ctx
.
pRes
,
(
void
*
)
&
pNode
,
POINTER_BYTES
);
}
...
...
source/libs/scalar/src/sclvector.c
浏览文件 @
5dff136b
...
...
@@ -865,12 +865,11 @@ int32_t vectorGetConvertType(int32_t type1, int32_t type2) {
}
int32_t
vectorConvertScalarParam
(
SScalarParam
*
input
,
SScalarParam
*
output
,
int32_t
type
)
{
int32_t
code
=
0
;
SDataType
t
=
{.
type
=
type
,
.
bytes
=
tDataTypes
[
type
].
bytes
};
output
->
numOfRows
=
input
->
numOfRows
;
output
->
columnData
=
sclCreateColumnInfoData
(
&
t
,
input
->
numOfRows
);
if
(
output
->
columnData
==
NULL
)
{
int32_t
code
=
sclCreateColumnInfoData
(
&
t
,
input
->
numOfRows
,
output
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
...
...
@@ -940,13 +939,12 @@ static int32_t doConvertHelper(SScalarParam* pDest, int32_t* convert, const SSca
pDest
->
numOfRows
=
pParam
->
numOfRows
;
SDataType
t
=
{.
type
=
type
,
.
bytes
=
tDataTypes
[
type
].
bytes
};
pDest
->
columnData
=
sclCreateColumnInfoData
(
&
t
,
pParam
->
numOfRows
);
if
(
pDest
->
columnData
==
NULL
)
{
sclError
(
"malloc %d failed"
,
(
int32_t
)(
pParam
->
numOfRows
*
sizeof
(
double
)));
return
TSDB_CODE_OUT_OF_MEMORY
;
int32_t
code
=
sclCreateColumnInfoData
(
&
t
,
pParam
->
numOfRows
,
pDest
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
int32_t
code
=
vectorConvertImpl
(
pParam
,
pDest
);
code
=
vectorConvertImpl
(
pParam
,
pDest
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录