Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a5e6bac8
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
a5e6bac8
编写于
3月 16, 2023
作者:
H
Haojun Liao
提交者:
GitHub
3月 16, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #20496 from taosdata/fix/liaohj
fix(tmq): release the tmq properly, and add some logs for subscription.
上级
9638b492
5142ccf6
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
78 addition
and
28 deletion
+78
-28
source/client/src/clientTmq.c
source/client/src/clientTmq.c
+78
-28
未找到文件。
source/client/src/clientTmq.c
浏览文件 @
a5e6bac8
...
...
@@ -91,7 +91,9 @@ struct tmq_t {
int8_t epStatus;
int32_t epSkipCnt;
#endif
int64_t
pollCnt
;
// poll info
int64_t
pollCnt
;
int64_t
totalRows
;
// timer
tmr_h
hbLiveTimer
;
...
...
@@ -127,6 +129,7 @@ enum {
typedef
struct
{
int64_t
pollCnt
;
int64_t
numOfRows
;
STqOffsetVal
committedOffset
;
STqOffsetVal
currentOffset
;
int32_t
vgId
;
...
...
@@ -629,8 +632,7 @@ FAIL:
return
0
;
}
static
int32_t
doAutoCommit
(
tmq_t
*
tmq
,
int8_t
automatic
,
int8_t
async
,
tmq_commit_cb
*
userCb
,
void
*
userParam
)
{
static
int32_t
doAutoCommit
(
tmq_t
*
tmq
,
int8_t
automatic
,
int8_t
async
,
tmq_commit_cb
*
userCb
,
void
*
userParam
)
{
int32_t
code
=
-
1
;
SMqCommitCbParamSet
*
pParamSet
=
taosMemoryCalloc
(
1
,
sizeof
(
SMqCommitCbParamSet
));
...
...
@@ -734,6 +736,7 @@ static void generateTimedTask(int64_t refId, int32_t type) {
taosWriteQitem
(
tmq
->
delayedTask
,
pTaskType
);
tsem_post
(
&
tmq
->
rspSem
);
}
taosReleaseRef
(
tmqMgmt
.
rsetId
,
refId
);
}
void
tmqAssignAskEpTask
(
void
*
param
,
void
*
tmrId
)
{
...
...
@@ -757,6 +760,8 @@ void tmqAssignDelayedReportTask(void* param, void* tmrId) {
taosWriteQitem
(
tmq
->
delayedTask
,
pTaskType
);
tsem_post
(
&
tmq
->
rspSem
);
}
taosReleaseRef
(
tmqMgmt
.
rsetId
,
refId
);
taosMemoryFree
(
param
);
}
...
...
@@ -770,6 +775,7 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
void
tmqSendHbReq
(
void
*
param
,
void
*
tmrId
)
{
int64_t
refId
=
*
(
int64_t
*
)
param
;
tmq_t
*
tmq
=
taosAcquireRef
(
tmqMgmt
.
rsetId
,
refId
);
if
(
tmq
==
NULL
)
{
taosMemoryFree
(
param
);
...
...
@@ -783,17 +789,19 @@ void tmqSendHbReq(void* param, void* tmrId) {
int32_t
tlen
=
tSerializeSMqHbReq
(
NULL
,
0
,
&
req
);
if
(
tlen
<
0
)
{
tscError
(
"tSerializeSMqHbReq failed"
);
return
;
goto
OVER
;
}
void
*
pReq
=
taosMemoryCalloc
(
1
,
tlen
);
if
(
tlen
<
0
)
{
tscError
(
"failed to malloc MqHbReq msg, size:%d"
,
tlen
);
return
;
goto
OVER
;
}
if
(
tSerializeSMqHbReq
(
pReq
,
tlen
,
&
req
)
<
0
)
{
tscError
(
"tSerializeSMqHbReq %d failed"
,
tlen
);
taosMemoryFree
(
pReq
);
return
;
goto
OVER
;
}
SMsgSendInfo
*
sendInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SMsgSendInfo
));
...
...
@@ -801,6 +809,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
taosMemoryFree
(
pReq
);
goto
OVER
;
}
sendInfo
->
msgInfo
=
(
SDataBuf
){
.
pData
=
pReq
,
.
len
=
tlen
,
...
...
@@ -820,6 +829,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
OVER:
taosTmrReset
(
tmqSendHbReq
,
1000
,
param
,
tmqMgmt
.
timer
,
&
tmq
->
hbLiveTimer
);
taosReleaseRef
(
tmqMgmt
.
rsetId
,
refId
);
}
int32_t
tmqHandleAllDelayedTask
(
tmq_t
*
pTmq
)
{
...
...
@@ -960,8 +970,15 @@ int32_t tmq_unsubscribe(tmq_t* tmq) {
return
rsp
;
}
static
void
freeClientVgImpl
(
void
*
param
)
{
SMqClientTopic
*
pTopic
=
param
;
taosMemoryFreeClear
(
pTopic
->
schema
.
pSchema
);
taosArrayDestroy
(
pTopic
->
vgs
);
}
void
tmqFreeImpl
(
void
*
handle
)
{
tmq_t
*
tmq
=
(
tmq_t
*
)
handle
;
tmq_t
*
tmq
=
(
tmq_t
*
)
handle
;
int64_t
id
=
tmq
->
consumerId
;
// TODO stop timer
if
(
tmq
->
mqueue
)
{
...
...
@@ -977,16 +994,11 @@ void tmqFreeImpl(void* handle) {
tsem_destroy
(
&
tmq
->
rspSem
);
taosThreadMutexDestroy
(
&
tmq
->
lock
);
int32_t
sz
=
taosArrayGetSize
(
tmq
->
clientTopics
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqClientTopic
*
pTopic
=
taosArrayGet
(
tmq
->
clientTopics
,
i
);
taosMemoryFreeClear
(
pTopic
->
schema
.
pSchema
);
taosArrayDestroy
(
pTopic
->
vgs
);
}
taosArrayDestroy
(
tmq
->
clientTopics
);
taosArrayDestroyEx
(
tmq
->
clientTopics
,
freeClientVgImpl
);
taos_close_internal
(
tmq
->
pTscObj
);
taosMemoryFree
(
tmq
);
tscDebug
(
"consumer:0x%"
PRIx64
" closed"
,
id
);
}
static
void
tmqMgmtInit
(
void
)
{
...
...
@@ -1086,8 +1098,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
char
buf
[
80
]
=
{
0
};
STqOffsetVal
offset
=
{.
type
=
pTmq
->
resetOffsetCfg
};
tFormatOffset
(
buf
,
tListLen
(
buf
),
&
offset
);
tscInfo
(
"consumer:0x%"
PRIx64
" is setup, groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s, backgroudHB:%d"
,
pTmq
->
consumerId
,
pTmq
->
groupId
,
pTmq
->
useSnapshot
,
pTmq
->
autoCommit
,
pTmq
->
autoCommitInterval
,
buf
,
tscInfo
(
"consumer:0x%"
PRIx64
" is setup,
refId:%"
PRId64
",
groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s, backgroudHB:%d"
,
pTmq
->
consumerId
,
pTmq
->
refId
,
pTmq
->
groupId
,
pTmq
->
useSnapshot
,
pTmq
->
autoCommit
,
pTmq
->
autoCommitInterval
,
buf
,
pTmq
->
hbBgEnable
);
return
pTmq
;
...
...
@@ -1228,10 +1240,12 @@ void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* para
int32_t
tmqPollCb
(
void
*
param
,
SDataBuf
*
pMsg
,
int32_t
code
)
{
SMqPollCbParam
*
pParam
=
(
SMqPollCbParam
*
)
param
;
int64_t
refId
=
pParam
->
refId
;
SMqClientVg
*
pVg
=
pParam
->
pVg
;
SMqClientTopic
*
pTopic
=
pParam
->
pTopic
;
tmq_t
*
tmq
=
taosAcquireRef
(
tmqMgmt
.
rsetId
,
pParam
->
refId
);
tmq_t
*
tmq
=
taosAcquireRef
(
tmqMgmt
.
rsetId
,
refId
);
if
(
tmq
==
NULL
)
{
tsem_destroy
(
&
pParam
->
rspSem
);
taosMemoryFree
(
pParam
);
...
...
@@ -1282,6 +1296,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tmq
->
consumerId
,
vgId
,
msgEpoch
,
tmqEpoch
,
requestId
);
tsem_post
(
&
tmq
->
rspSem
);
taosReleaseRef
(
tmqMgmt
.
rsetId
,
refId
);
taosMemoryFree
(
pMsg
->
pData
);
taosMemoryFree
(
pMsg
->
pEpSet
);
return
0
;
...
...
@@ -1344,6 +1360,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tmq
->
consumerId
,
rspType
,
vgId
,
tmq
->
mqueue
->
numOfItems
,
requestId
);
tsem_post
(
&
tmq
->
rspSem
);
taosReleaseRef
(
tmqMgmt
.
rsetId
,
refId
);
return
0
;
CREATE_MSG_FAIL:
...
...
@@ -1352,6 +1370,8 @@ CREATE_MSG_FAIL:
}
tsem_post
(
&
tmq
->
rspSem
);
taosReleaseRef
(
tmqMgmt
.
rsetId
,
refId
);
return
-
1
;
}
...
...
@@ -1389,6 +1409,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
.
vgStatus
=
TMQ_VG_STATUS__IDLE
,
.
vgSkipCnt
=
0
,
.
emptyBlockReceiveTs
=
0
,
.
numOfRows
=
0
,
};
taosArrayPush
(
pTopic
->
vgs
,
&
clientVg
);
...
...
@@ -1540,6 +1561,8 @@ static int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
}
END:
taosReleaseRef
(
tmqMgmt
.
rsetId
,
pParam
->
refId
);
if
(
!
async
)
{
tsem_post
(
&
pParam
->
rspSem
);
}
else
{
...
...
@@ -1579,7 +1602,7 @@ SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
return
pRspObj
;
}
SMqRspObj
*
tmqBuildRspFromWrapper
(
SMqPollRspWrapper
*
pWrapper
)
{
SMqRspObj
*
tmqBuildRspFromWrapper
(
SMqPollRspWrapper
*
pWrapper
,
SMqClientVg
*
pVg
)
{
SMqRspObj
*
pRspObj
=
taosMemoryCalloc
(
1
,
sizeof
(
SMqRspObj
));
pRspObj
->
resType
=
RES_TYPE__TMQ
;
...
...
@@ -1597,6 +1620,14 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
setResSchemaInfo
(
&
pRspObj
->
resInfo
,
pWrapper
->
topicHandle
->
schema
.
pSchema
,
pWrapper
->
topicHandle
->
schema
.
nCols
);
}
// extract the rows in this data packet
for
(
int32_t
i
=
0
;
i
<
pRspObj
->
rsp
.
blockNum
;
++
i
)
{
SRetrieveTableRsp
*
pRetrieve
=
(
SRetrieveTableRsp
*
)
taosArrayGetP
(
pRspObj
->
rsp
.
blockData
,
i
);
int64_t
rows
=
htobe64
(
pRetrieve
->
numOfRows
);
pRspObj
->
resInfo
.
totalRows
+=
rows
;
pVg
->
numOfRows
+=
rows
;
}
return
pRspObj
;
}
...
...
@@ -1808,10 +1839,11 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
rspWrapper
=
NULL
;
continue
;
}
else
{
// build rsp
SMqRspObj
*
pRsp
=
tmqBuildRspFromWrapper
(
pollRspWrapper
);
tscDebug
(
"consumer:0x%"
PRIx64
" process poll rsp, vgId:%d, offset:%s, blocks:%d, reqId:0x%"
PRIx64
,
tmq
->
consumerId
,
pVg
->
vgId
,
buf
,
pDataRsp
->
blockNum
,
pollRspWrapper
->
reqId
);
SMqRspObj
*
pRsp
=
tmqBuildRspFromWrapper
(
pollRspWrapper
,
pVg
);
tscDebug
(
"consumer:0x%"
PRIx64
" process poll rsp, vgId:%d, offset:%s, blocks:%d, r
ows:%"
PRId64
" r
eqId:0x%"
PRIx64
,
tmq
->
consumerId
,
pVg
->
vgId
,
buf
,
pDataRsp
->
blockNum
,
p
Rsp
->
resInfo
.
totalRows
,
p
ollRspWrapper
->
reqId
);
tmq
->
totalRows
+=
pRsp
->
resInfo
.
totalRows
;
taosFreeQitem
(
pollRspWrapper
);
return
pRsp
;
}
...
...
@@ -1864,12 +1896,11 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
// build rsp
void
*
pRsp
=
NULL
;
if
(
pollRspWrapper
->
taosxRsp
.
createTableNum
==
0
)
{
pRsp
=
tmqBuildRspFromWrapper
(
pollRspWrapper
);
pRsp
=
tmqBuildRspFromWrapper
(
pollRspWrapper
,
pVg
);
}
else
{
pRsp
=
tmqBuildTaosxRspFromWrapper
(
pollRspWrapper
);
}
char
buf
[
80
];
tFormatOffset
(
buf
,
80
,
&
pVg
->
currentOffset
);
tscDebug
(
"consumer:0x%"
PRIx64
" process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, reqId:0x%"
PRIx64
,
tmq
->
consumerId
,
pVg
->
vgId
,
...
...
@@ -1957,9 +1988,6 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
tmq
->
consumerId
,
tmq
->
epoch
,
startTime
,
currentTime
);
return
NULL
;
}
/*tscInfo("consumer:0x%" PRIx64 ", (epoch %d) wait, start time %" PRId64 ", current time %" PRId64*/
/*", left time %" PRId64,*/
/*tmq->consumerId, tmq->epoch, startTime, currentTime, (timeout - elapsedTime));*/
tsem_timewait
(
&
tmq
->
rspSem
,
(
timeout
-
elapsedTime
));
}
else
{
// use tsem_timewait instead of tsem_wait to avoid unexpected stuck
...
...
@@ -1975,6 +2003,24 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
return
rsp
;
}
int32_t
numOfTopics
=
taosArrayGetSize
(
tmq
->
clientTopics
);
tscDebug
(
"consumer:0x%"
PRIx64
" closing poll:%"
PRId64
" rows:%"
PRId64
" topics:%d, final epoch:%d"
,
tmq
->
consumerId
,
tmq
->
pollCnt
,
tmq
->
totalRows
,
numOfTopics
,
tmq
->
epoch
);
tscDebug
(
"consumer:0x%"
PRIx64
" rows dist begin: "
,
tmq
->
consumerId
);
for
(
int32_t
i
=
0
;
i
<
numOfTopics
;
++
i
)
{
SMqClientTopic
*
pTopics
=
taosArrayGet
(
tmq
->
clientTopics
,
i
);
tscDebug
(
"consumer:0x%"
PRIx64
" topic:%d"
,
tmq
->
consumerId
,
i
);
int32_t
numOfVgs
=
taosArrayGetSize
(
pTopics
->
vgs
);
for
(
int32_t
j
=
0
;
j
<
numOfVgs
;
++
j
)
{
SMqClientVg
*
pVg
=
taosArrayGet
(
pTopics
->
vgs
,
j
);
tscDebug
(
"topic:%s, %d. vgId:%d rows:%"
PRId64
,
pTopics
->
topicName
,
j
,
pVg
->
vgId
,
pVg
->
numOfRows
);
}
}
tscDebug
(
"consumer:0x%"
PRIx64
" rows dist end"
,
tmq
->
consumerId
);
int32_t
retryCnt
=
0
;
tmq_list_t
*
lst
=
tmq_list_new
();
while
(
1
)
{
...
...
@@ -2177,7 +2223,9 @@ int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) {
}
int32_t
tmqCommitDone
(
SMqCommitCbParamSet
*
pParamSet
)
{
tmq_t
*
tmq
=
taosAcquireRef
(
tmqMgmt
.
rsetId
,
pParamSet
->
refId
);
int64_t
refId
=
pParamSet
->
refId
;
tmq_t
*
tmq
=
taosAcquireRef
(
tmqMgmt
.
rsetId
,
refId
);
if
(
tmq
==
NULL
)
{
if
(
!
pParamSet
->
async
)
{
tsem_destroy
(
&
pParamSet
->
rspSem
);
...
...
@@ -2205,6 +2253,8 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
#endif
taosReleaseRef
(
tmqMgmt
.
rsetId
,
refId
);
return
0
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录