Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
976ec81a
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
976ec81a
编写于
5月 17, 2022
作者:
L
Liu Jicong
提交者:
GitHub
5月 17, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12587 from taosdata/feature/stream
fix(tmq): fix memory leak
上级
7ab15e8f
d762ec63
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
91 addition
and
43 deletion
+91
-43
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+5
-1
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+13
-8
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+13
-3
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+59
-31
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+1
-0
未找到文件。
source/client/inc/clientInt.h
浏览文件 @
976ec81a
...
...
@@ -234,6 +234,10 @@ static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool conver
if
(
msg
->
rsp
.
withSchema
)
{
SSchemaWrapper
*
pSW
=
(
SSchemaWrapper
*
)
taosArrayGetP
(
msg
->
rsp
.
blockSchema
,
msg
->
resIter
);
setResSchemaInfo
(
&
msg
->
resInfo
,
pSW
->
pSchema
,
pSW
->
nCols
);
taosMemoryFreeClear
(
msg
->
resInfo
.
row
);
taosMemoryFreeClear
(
msg
->
resInfo
.
pCol
);
taosMemoryFreeClear
(
msg
->
resInfo
.
length
);
taosMemoryFreeClear
(
msg
->
resInfo
.
convertBuf
);
}
setQueryResultFromRsp
(
&
msg
->
resInfo
,
pRetrieve
,
convertUcs4
);
return
&
msg
->
resInfo
;
...
...
@@ -310,7 +314,7 @@ void hbMgrInitMqHbRspHandle();
SRequestObj
*
launchQueryImpl
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
int32_t
code
,
bool
keepQuery
,
void
**
res
);
int32_t
getQueryPlan
(
SRequestObj
*
pRequest
,
SQuery
*
pQuery
,
SArray
**
pNodeList
);
int32_t
scheduleQuery
(
SRequestObj
*
pRequest
,
SQueryPlan
*
pDag
,
SArray
*
pNodeList
,
void
**
res
);
int32_t
refreshMeta
(
STscObj
*
pTscObj
,
SRequestObj
*
pRequest
);
int32_t
refreshMeta
(
STscObj
*
pTscObj
,
SRequestObj
*
pRequest
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
976ec81a
...
...
@@ -135,16 +135,18 @@ FAIL:
}
static
SMqRebInfo
*
mndGetOrCreateRebSub
(
SHashObj
*
pHash
,
const
char
*
key
)
{
SMqRebInfo
*
pReb
Sub
=
taosHashGet
(
pHash
,
key
,
strlen
(
key
)
+
1
);
if
(
pReb
Sub
==
NULL
)
{
pReb
Sub
=
tNewSMqRebSubscribe
(
key
);
if
(
pReb
Sub
==
NULL
)
{
SMqRebInfo
*
pReb
Info
=
taosHashGet
(
pHash
,
key
,
strlen
(
key
)
+
1
);
if
(
pReb
Info
==
NULL
)
{
pReb
Info
=
tNewSMqRebSubscribe
(
key
);
if
(
pReb
Info
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
taosHashPut
(
pHash
,
key
,
strlen
(
key
)
+
1
,
pRebSub
,
sizeof
(
SMqRebInfo
));
taosHashPut
(
pHash
,
key
,
strlen
(
key
)
+
1
,
pRebInfo
,
sizeof
(
SMqRebInfo
));
taosMemoryFree
(
pRebInfo
);
pRebInfo
=
taosHashGet
(
pHash
,
key
,
strlen
(
key
)
+
1
);
}
return
pReb
Sub
;
return
pReb
Info
;
}
static
int32_t
mndProcessMqTimerMsg
(
SNodeMsg
*
pMsg
)
{
...
...
@@ -305,8 +307,10 @@ static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) {
ASSERT
(
pTopic
);
taosRLockLatch
(
&
pTopic
->
lock
);
topicEp
.
schema
.
nCols
=
pTopic
->
schema
.
nCols
;
topicEp
.
schema
.
pSchema
=
taosMemoryCalloc
(
topicEp
.
schema
.
nCols
,
sizeof
(
SSchema
));
memcpy
(
topicEp
.
schema
.
pSchema
,
pTopic
->
schema
.
pSchema
,
topicEp
.
schema
.
nCols
*
sizeof
(
SSchema
));
if
(
topicEp
.
schema
.
nCols
)
{
topicEp
.
schema
.
pSchema
=
taosMemoryCalloc
(
topicEp
.
schema
.
nCols
,
sizeof
(
SSchema
));
memcpy
(
topicEp
.
schema
.
pSchema
,
pTopic
->
schema
.
pSchema
,
topicEp
.
schema
.
nCols
*
sizeof
(
SSchema
));
}
taosRUnLockLatch
(
&
pTopic
->
lock
);
mndReleaseTopic
(
pMnode
,
pTopic
);
...
...
@@ -517,6 +521,7 @@ SUBSCRIBE_OVER:
}
if
(
pConsumerNew
)
{
tDeleteSMqConsumerObj
(
pConsumerNew
);
taosMemoryFree
(
pConsumerNew
);
}
// TODO: replace with destroy subscribe msg
if
(
subscribe
.
topicNames
)
taosArrayDestroyP
(
subscribe
.
topicNames
,
(
FDelete
)
taosMemoryFree
);
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
976ec81a
...
...
@@ -502,9 +502,9 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
SMqRebInputObj
rebInput
=
{
0
};
SMqRebOutputObj
rebOutput
=
{
0
};
rebOutput
.
newConsumers
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
rebOutput
.
removedConsumers
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
rebOutput
.
touchedConsumers
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
rebOutput
.
newConsumers
=
taosArrayInit
(
0
,
sizeof
(
int64_t
));
rebOutput
.
removedConsumers
=
taosArrayInit
(
0
,
sizeof
(
int64_t
));
rebOutput
.
touchedConsumers
=
taosArrayInit
(
0
,
sizeof
(
int64_t
));
rebOutput
.
rebVgs
=
taosArrayInit
(
0
,
sizeof
(
SMqRebOutputVg
));
SMqRebInfo
*
pRebInfo
=
(
SMqRebInfo
*
)
pIter
;
...
...
@@ -547,6 +547,16 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
if
(
mndPersistRebResult
(
pMnode
,
pMsg
,
&
rebOutput
)
<
0
)
{
mError
(
"persist rebalance output error, possibly vnode splitted or dropped"
);
}
taosArrayDestroy
(
pRebInfo
->
lostConsumers
);
taosArrayDestroy
(
pRebInfo
->
newConsumers
);
taosArrayDestroy
(
pRebInfo
->
removedConsumers
);
taosArrayDestroy
(
rebOutput
.
newConsumers
);
taosArrayDestroy
(
rebOutput
.
touchedConsumers
);
taosArrayDestroy
(
rebOutput
.
removedConsumers
);
taosArrayDestroy
(
rebOutput
.
rebVgs
);
tDeleteSubscribeObj
(
rebOutput
.
pSub
);
taosMemoryFree
(
rebOutput
.
pSub
);
}
// reset flag
...
...
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
976ec81a
...
...
@@ -72,8 +72,15 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) {
SSdbRaw
*
mndTopicActionEncode
(
SMqTopicObj
*
pTopic
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
int32_t
physicalPlanLen
=
strlen
(
pTopic
->
physicalPlan
)
+
1
;
int32_t
schemaLen
=
taosEncodeSSchemaWrapper
(
NULL
,
&
pTopic
->
schema
);
void
*
swBuf
=
NULL
;
int32_t
physicalPlanLen
=
0
;
if
(
pTopic
->
physicalPlan
)
{
physicalPlanLen
=
strlen
(
pTopic
->
physicalPlan
)
+
1
;
}
int32_t
schemaLen
=
0
;
if
(
pTopic
->
schema
.
nCols
)
{
schemaLen
=
taosEncodeSSchemaWrapper
(
NULL
,
&
pTopic
->
schema
);
}
int32_t
size
=
sizeof
(
SMqTopicObj
)
+
physicalPlanLen
+
pTopic
->
sqlLen
+
pTopic
->
astLen
+
schemaLen
+
MND_TOPIC_RESERVE_SIZE
;
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_TOPIC
,
MND_TOPIC_VER_NUMBER
,
size
);
...
...
@@ -96,18 +103,24 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTopic
->
sqlLen
,
TOPIC_ENCODE_OVER
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
sql
,
pTopic
->
sqlLen
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTopic
->
astLen
,
TOPIC_ENCODE_OVER
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
ast
,
pTopic
->
astLen
,
TOPIC_ENCODE_OVER
);
if
(
pTopic
->
astLen
)
{
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
ast
,
pTopic
->
astLen
,
TOPIC_ENCODE_OVER
);
}
SDB_SET_INT32
(
pRaw
,
dataPos
,
physicalPlanLen
,
TOPIC_ENCODE_OVER
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
physicalPlan
,
physicalPlanLen
,
TOPIC_ENCODE_OVER
);
void
*
swBuf
=
taosMemoryMalloc
(
schemaLen
);
if
(
swBuf
==
NULL
)
{
goto
TOPIC_ENCODE_OVER
;
if
(
physicalPlanLen
)
{
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
physicalPlan
,
physicalPlanLen
,
TOPIC_ENCODE_OVER
);
}
void
*
aswBuf
=
swBuf
;
taosEncodeSSchemaWrapper
(
&
aswBuf
,
&
pTopic
->
schema
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
schemaLen
,
TOPIC_ENCODE_OVER
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
swBuf
,
schemaLen
,
TOPIC_ENCODE_OVER
);
if
(
schemaLen
)
{
swBuf
=
taosMemoryMalloc
(
schemaLen
);
if
(
swBuf
==
NULL
)
{
goto
TOPIC_ENCODE_OVER
;
}
void
*
aswBuf
=
swBuf
;
taosEncodeSSchemaWrapper
(
&
aswBuf
,
&
pTopic
->
schema
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
swBuf
,
schemaLen
,
TOPIC_ENCODE_OVER
);
}
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTopic
->
refConsumerCnt
,
TOPIC_ENCODE_OVER
);
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
MND_TOPIC_RESERVE_SIZE
,
TOPIC_ENCODE_OVER
);
...
...
@@ -116,6 +129,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
terrno
=
TSDB_CODE_SUCCESS
;
TOPIC_ENCODE_OVER:
if
(
swBuf
)
taosMemoryFree
(
swBuf
);
if
(
terrno
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"topic:%s, failed to encode to raw:%p since %s"
,
pTopic
->
name
,
pRaw
,
terrstr
());
sdbFreeRaw
(
pRaw
);
...
...
@@ -168,29 +182,43 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
sql
,
pTopic
->
sqlLen
,
TOPIC_DECODE_OVER
);
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pTopic
->
astLen
,
TOPIC_DECODE_OVER
);
pTopic
->
ast
=
taosMemoryCalloc
(
pTopic
->
astLen
,
sizeof
(
char
));
if
(
pTopic
->
ast
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TOPIC_DECODE_OVER
;
if
(
pTopic
->
astLen
)
{
pTopic
->
ast
=
taosMemoryCalloc
(
pTopic
->
astLen
,
sizeof
(
char
));
if
(
pTopic
->
ast
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TOPIC_DECODE_OVER
;
}
}
else
{
pTopic
->
ast
=
NULL
;
}
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
ast
,
pTopic
->
astLen
,
TOPIC_DECODE_OVER
);
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
len
,
TOPIC_DECODE_OVER
);
pTopic
->
physicalPlan
=
taosMemoryCalloc
(
len
,
sizeof
(
char
));
if
(
pTopic
->
physicalPlan
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TOPIC_DECODE_OVER
;
if
(
len
)
{
pTopic
->
physicalPlan
=
taosMemoryCalloc
(
len
,
sizeof
(
char
));
if
(
pTopic
->
physicalPlan
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TOPIC_DECODE_OVER
;
}
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
physicalPlan
,
len
,
TOPIC_DECODE_OVER
);
}
else
{
pTopic
->
physicalPlan
=
NULL
;
}
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
physicalPlan
,
len
,
TOPIC_DECODE_OVER
);
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
len
,
TOPIC_DECODE_OVER
);
void
*
buf
=
taosMemoryMalloc
(
len
);
if
(
buf
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TOPIC_DECODE_OVER
;
}
SDB_GET_BINARY
(
pRaw
,
dataPos
,
buf
,
len
,
TOPIC_DECODE_OVER
);
if
(
taosDecodeSSchemaWrapper
(
buf
,
&
pTopic
->
schema
)
==
NULL
)
{
goto
TOPIC_DECODE_OVER
;
if
(
len
)
{
void
*
buf
=
taosMemoryMalloc
(
len
);
if
(
buf
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TOPIC_DECODE_OVER
;
}
SDB_GET_BINARY
(
pRaw
,
dataPos
,
buf
,
len
,
TOPIC_DECODE_OVER
);
if
(
taosDecodeSSchemaWrapper
(
buf
,
&
pTopic
->
schema
)
==
NULL
)
{
goto
TOPIC_DECODE_OVER
;
}
}
else
{
pTopic
->
schema
.
nCols
=
0
;
pTopic
->
schema
.
sver
=
0
;
pTopic
->
schema
.
pSchema
=
NULL
;
}
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pTopic
->
refConsumerCnt
,
TOPIC_DECODE_OVER
);
...
...
@@ -340,9 +368,9 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
return
-
1
;
}
}
else
{
topicObj
.
ast
=
strdup
(
""
)
;
topicObj
.
astLen
=
1
;
topicObj
.
physicalPlan
=
strdup
(
""
)
;
topicObj
.
ast
=
NULL
;
topicObj
.
astLen
=
0
;
topicObj
.
physicalPlan
=
NULL
;
topicObj
.
subType
=
TOPIC_SUB_TYPE__DB
;
topicObj
.
withTbName
=
1
;
topicObj
.
withSchema
=
1
;
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
976ec81a
...
...
@@ -613,6 +613,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
fetchOffset
++
;
}
taosMemoryFree
(
pHeadWithCkSum
);
ASSERT
(
taosArrayGetSize
(
rsp
.
blockData
)
==
rsp
.
blockNum
);
ASSERT
(
taosArrayGetSize
(
rsp
.
blockDataLen
)
==
rsp
.
blockNum
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录