Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
36428ffe
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
36428ffe
编写于
4月 29, 2022
作者:
L
Liu Jicong
提交者:
GitHub
4月 29, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12056 from taosdata/feature/tq
fix: memory error
上级
72d95940
4402833a
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
22 addition
and
11 deletion
+22
-11
example/src/tmq.c
example/src/tmq.c
+5
-2
source/client/src/tmq.c
source/client/src/tmq.c
+17
-9
未找到文件。
example/src/tmq.c
浏览文件 @
36428ffe
...
...
@@ -14,7 +14,9 @@
*/
#include <assert.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "taos.h"
...
...
@@ -24,7 +26,7 @@ static void msg_process(TAOS_RES* msg) {
char
buf
[
1024
];
memset
(
buf
,
0
,
1024
);
printf
(
"topic: %s
\n
"
,
tmq_get_topic_name
(
msg
));
printf
(
"vg:%d
\n
"
,
tmq_get_vgroup_id
(
msg
));
printf
(
"vg:
%d
\n
"
,
tmq_get_vgroup_id
(
msg
));
while
(
1
)
{
TAOS_ROW
row
=
taos_fetch_row
(
msg
);
if
(
row
==
NULL
)
break
;
...
...
@@ -141,7 +143,7 @@ int32_t create_topic() {
}
void
tmq_commit_cb_print
(
tmq_t
*
tmq
,
tmq_resp_err_t
resp
,
tmq_topic_vgroup_list_t
*
offsets
,
void
*
param
)
{
printf
(
"commit %d
\n
"
,
resp
);
printf
(
"commit %d
tmq %p offsets %p param %p
\n
"
,
resp
,
tmq
,
offsets
,
param
);
}
tmq_t
*
build_consumer
()
{
...
...
@@ -232,6 +234,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
msg_process
(
tmqmessage
);
taos_free_result
(
tmqmessage
);
tmq_commit
(
tmq
,
NULL
,
1
);
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
}
}
...
...
source/client/src/tmq.c
浏览文件 @
36428ffe
...
...
@@ -377,7 +377,15 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
if
(
pParam
->
tmq
->
commitCb
)
{
pParam
->
tmq
->
commitCb
(
pParam
->
tmq
,
pParam
->
rspErr
,
NULL
,
pParam
->
tmq
->
commitCbUserParam
);
}
if
(
!
pParam
->
async
)
tsem_post
(
&
pParam
->
rspSem
);
if
(
!
pParam
->
async
)
tsem_post
(
&
pParam
->
rspSem
);
else
{
tsem_destroy
(
&
pParam
->
rspSem
);
/*if (pParam->pArray) {*/
/*taosArrayDestroy(pParam->pArray);*/
/*}*/
taosMemoryFree
(
pParam
);
}
return
0
;
}
...
...
@@ -560,7 +568,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
tscError
(
"failed to malloc request"
);
}
SMqCommitCbParam
*
pParam
=
taosMemory
Malloc
(
sizeof
(
SMqCommitCbParam
));
SMqCommitCbParam
*
pParam
=
taosMemory
Calloc
(
1
,
sizeof
(
SMqCommitCbParam
));
if
(
pParam
==
NULL
)
{
return
-
1
;
}
...
...
@@ -575,6 +583,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
};
SMsgSendInfo
*
sendInfo
=
buildMsgInfoImpl
(
pRequest
);
sendInfo
->
requestObjRefId
=
0
;
sendInfo
->
param
=
pParam
;
sendInfo
->
fp
=
tmqCommitCb
;
SEpSet
epSet
=
getEpSet_s
(
&
tmq
->
pTscObj
->
pAppInfo
->
mgmtEp
);
...
...
@@ -585,13 +594,12 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
if
(
!
async
)
{
tsem_wait
(
&
pParam
->
rspSem
);
resp
=
pParam
->
rspErr
;
}
tsem_destroy
(
&
pParam
->
rspSem
);
taosMemoryFree
(
pParam
);
tsem_destroy
(
&
pParam
->
rspSem
);
taosMemoryFree
(
pParam
);
if
(
pArray
)
{
taosArrayDestroy
(
pArray
);
if
(
pArray
)
{
taosArrayDestroy
(
pArray
);
}
}
return
resp
;
...
...
@@ -1313,7 +1321,7 @@ const char* tmq_err2str(tmq_resp_err_t err) {
const
char
*
tmq_get_topic_name
(
TAOS_RES
*
res
)
{
if
(
TD_RES_TMQ
(
res
))
{
SMqRspObj
*
pRspObj
=
(
SMqRspObj
*
)
res
;
return
pRspObj
->
topic
;
return
strchr
(
pRspObj
->
topic
,
'.'
)
+
1
;
}
else
{
return
NULL
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录