Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
53a0ff0b
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
53a0ff0b
编写于
3月 04, 2023
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: add some logs.
上级
3645c52b
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
29 addition
and
17 deletion
+29
-17
source/client/src/clientTmq.c
source/client/src/clientTmq.c
+26
-14
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+1
-1
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+2
-2
未找到文件。
source/client/src/clientTmq.c
浏览文件 @
53a0ff0b
...
@@ -1076,7 +1076,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
...
@@ -1076,7 +1076,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
SCMSubscribeReq
req
=
{
0
};
SCMSubscribeReq
req
=
{
0
};
int32_t
code
=
0
;
int32_t
code
=
0
;
tscDebug
(
"consumer:0x%"
PRIx64
"
subscribe %d topics"
,
tmq
->
consumer
Id
,
sz
);
tscDebug
(
"consumer:0x%"
PRIx64
"
cgroup:%s, subscribe %d topics"
,
tmq
->
consumerId
,
tmq
->
group
Id
,
sz
);
req
.
consumerId
=
tmq
->
consumerId
;
req
.
consumerId
=
tmq
->
consumerId
;
tstrncpy
(
req
.
clientId
,
tmq
->
clientId
,
256
);
tstrncpy
(
req
.
clientId
,
tmq
->
clientId
,
256
);
...
@@ -1213,31 +1213,38 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
...
@@ -1213,31 +1213,38 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
return
-
1
;
return
-
1
;
}
}
int32_t
epoch
=
pParam
->
epoch
;
int32_t
epoch
=
pParam
->
epoch
;
int32_t
vgId
=
pParam
->
vgId
;
int32_t
vgId
=
pParam
->
vgId
;
uint64_t
requestId
=
pParam
->
requestId
;
taosMemoryFree
(
pParam
);
taosMemoryFree
(
pParam
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
tscWarn
(
"msg discard from vgId:%d, epoch %d, since %s"
,
vgId
,
epoch
,
terrstr
());
tscWarn
(
"consumer:0x%"
PRIx64
" msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%"
PRIx64
,
tmq
->
consumerId
,
vgId
,
epoch
,
tstrerror
(
code
),
requestId
);
if
(
pMsg
->
pData
)
taosMemoryFree
(
pMsg
->
pData
);
if
(
pMsg
->
pData
)
taosMemoryFree
(
pMsg
->
pData
);
if
(
pMsg
->
pEpSet
)
taosMemoryFree
(
pMsg
->
pEpSet
);
if
(
pMsg
->
pEpSet
)
taosMemoryFree
(
pMsg
->
pEpSet
);
// in case of consumer mismatch, wait for 500ms and retry
if
(
code
==
TSDB_CODE_TMQ_CONSUMER_MISMATCH
)
{
if
(
code
==
TSDB_CODE_TMQ_CONSUMER_MISMATCH
)
{
taosMsleep
(
500
);
atomic_store_8
(
&
tmq
->
status
,
TMQ_CONSUMER_STATUS__RECOVER
);
atomic_store_8
(
&
tmq
->
status
,
TMQ_CONSUMER_STATUS__RECOVER
);
goto
CREATE_MSG_FAIL
;
}
else
if
(
code
==
TSDB_CODE_TQ_NO_COMMITTED_OFFSET
)
{
}
if
(
code
==
TSDB_CODE_TQ_NO_COMMITTED_OFFSET
)
{
SMqPollRspWrapper
*
pRspWrapper
=
taosAllocateQitem
(
sizeof
(
SMqPollRspWrapper
),
DEF_QITEM
,
0
);
SMqPollRspWrapper
*
pRspWrapper
=
taosAllocateQitem
(
sizeof
(
SMqPollRspWrapper
),
DEF_QITEM
,
0
);
if
(
pRspWrapper
==
NULL
)
{
if
(
pRspWrapper
==
NULL
)
{
tscWarn
(
"msg discard from vgId:%d, epoch %d since out of memory"
,
vgId
,
epoch
);
tscWarn
(
"consumer:0x%"
PRIx64
" msg from vgId:%d discarded, epoch %d since out of memory, reqId:0x%"
PRIx64
,
tmq
->
consumerId
,
vgId
,
epoch
,
requestId
);
goto
CREATE_MSG_FAIL
;
goto
CREATE_MSG_FAIL
;
}
}
pRspWrapper
->
tmqRspType
=
TMQ_MSG_TYPE__END_RSP
;
pRspWrapper
->
tmqRspType
=
TMQ_MSG_TYPE__END_RSP
;
/*pRspWrapper->vgHandle = pVg;*/
/*pRspWrapper->vgHandle = pVg;*/
/*pRspWrapper->topicHandle = pTopic;*/
/*pRspWrapper->topicHandle = pTopic;*/
taosWriteQitem
(
tmq
->
mqueue
,
pRspWrapper
);
taosWriteQitem
(
tmq
->
mqueue
,
pRspWrapper
);
tsem_post
(
&
tmq
->
rspSem
);
tsem_post
(
&
tmq
->
rspSem
);
}
}
goto
CREATE_MSG_FAIL
;
goto
CREATE_MSG_FAIL
;
}
}
...
@@ -1245,8 +1252,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
...
@@ -1245,8 +1252,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
int32_t
tmqEpoch
=
atomic_load_32
(
&
tmq
->
epoch
);
int32_t
tmqEpoch
=
atomic_load_32
(
&
tmq
->
epoch
);
if
(
msgEpoch
<
tmqEpoch
)
{
if
(
msgEpoch
<
tmqEpoch
)
{
// do not write into queue since updating epoch reset
// do not write into queue since updating epoch reset
tscWarn
(
"msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d"
,
vgId
,
msgEpoch
,
tscWarn
(
"consumer:0x%"
PRIx64
" msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%"
PRIx64
,
tmqEpoch
);
tmq
->
consumerId
,
vgId
,
msgEpoch
,
tmqEpoch
,
requestId
);
tsem_post
(
&
tmq
->
rspSem
);
tsem_post
(
&
tmq
->
rspSem
);
taosMemoryFree
(
pMsg
->
pData
);
taosMemoryFree
(
pMsg
->
pData
);
taosMemoryFree
(
pMsg
->
pEpSet
);
taosMemoryFree
(
pMsg
->
pEpSet
);
...
@@ -1254,7 +1262,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
...
@@ -1254,7 +1262,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
}
}
if
(
msgEpoch
!=
tmqEpoch
)
{
if
(
msgEpoch
!=
tmqEpoch
)
{
tscWarn
(
"mismatch rsp from vgId:%d, epoch %d, current epoch %d"
,
vgId
,
msgEpoch
,
tmqEpoch
);
tscWarn
(
"consumer:0x%"
PRIx64
" mismatch rsp from vgId:%d, epoch %d, current epoch %d, reqId:0x%"
PRIx64
,
tmq
->
consumerId
,
vgId
,
msgEpoch
,
tmqEpoch
,
requestId
);
}
}
// handle meta rsp
// handle meta rsp
...
@@ -1264,7 +1273,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
...
@@ -1264,7 +1273,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
if
(
pRspWrapper
==
NULL
)
{
if
(
pRspWrapper
==
NULL
)
{
taosMemoryFree
(
pMsg
->
pData
);
taosMemoryFree
(
pMsg
->
pData
);
taosMemoryFree
(
pMsg
->
pEpSet
);
taosMemoryFree
(
pMsg
->
pEpSet
);
tscWarn
(
"
msg discard from vgId:%d, epoch %d since out of memory"
,
vgId
,
epoch
);
tscWarn
(
"
consumer:0x%"
PRIx64
" msg discard from vgId:%d, epoch %d since out of memory"
,
tmq
->
consumerId
,
vgId
,
epoch
);
goto
CREATE_MSG_FAIL
;
goto
CREATE_MSG_FAIL
;
}
}
...
@@ -1299,16 +1308,19 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
...
@@ -1299,16 +1308,19 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
taosMemoryFree
(
pMsg
->
pData
);
taosMemoryFree
(
pMsg
->
pData
);
taosMemoryFree
(
pMsg
->
pEpSet
);
taosMemoryFree
(
pMsg
->
pEpSet
);
tscDebug
(
"consumer:0x%"
PRIx64
", put poll res into mqueue, total in queue:%d"
,
tmq
->
consumerId
,
tmq
->
mqueue
->
numOfItems
);
tscDebug
(
"consumer:0x%"
PRIx64
", put poll res into mqueue, total in queue:%d, reqId:0x%"
PRIx64
,
tmq
->
consumerId
,
tmq
->
mqueue
->
numOfItems
,
requestId
);
taosWriteQitem
(
tmq
->
mqueue
,
pRspWrapper
);
taosWriteQitem
(
tmq
->
mqueue
,
pRspWrapper
);
tsem_post
(
&
tmq
->
rspSem
);
tsem_post
(
&
tmq
->
rspSem
);
return
0
;
return
0
;
CREATE_MSG_FAIL:
CREATE_MSG_FAIL:
if
(
epoch
==
tmq
->
epoch
)
{
if
(
epoch
==
tmq
->
epoch
)
{
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
}
}
tsem_post
(
&
tmq
->
rspSem
);
tsem_post
(
&
tmq
->
rspSem
);
return
-
1
;
return
-
1
;
}
}
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
53a0ff0b
...
@@ -26,7 +26,7 @@
...
@@ -26,7 +26,7 @@
#define MND_CONSUMER_VER_NUMBER 1
#define MND_CONSUMER_VER_NUMBER 1
#define MND_CONSUMER_RESERVE_SIZE 64
#define MND_CONSUMER_RESERVE_SIZE 64
#define MND_CONSUMER_LOST_HB_CNT
3
#define MND_CONSUMER_LOST_HB_CNT
6
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
#define MND_CONSUMER_LOST_CLEAR_THRESHOLD 43200
static
int32_t
mqRebInExecCnt
=
0
;
static
int32_t
mqRebInExecCnt
=
0
;
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
53a0ff0b
...
@@ -296,7 +296,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
...
@@ -296,7 +296,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.
pVgEp
=
pVgEp
,
.
pVgEp
=
pVgEp
,
};
};
taosHashPut
(
pHash
,
&
pVgEp
->
vgId
,
sizeof
(
int32_t
),
&
outputVg
,
sizeof
(
SMqRebOutputVg
));
taosHashPut
(
pHash
,
&
pVgEp
->
vgId
,
sizeof
(
int32_t
),
&
outputVg
,
sizeof
(
SMqRebOutputVg
));
mInfo
(
"sub:%s mq rebalance remove vgId:%d from consumer:%"
PRIx64
",(first scan)"
,
sub
,
pVgEp
->
vgId
,
mInfo
(
"sub:%s mq rebalance remove vgId:%d from consumer:
0x
%"
PRIx64
",(first scan)"
,
sub
,
pVgEp
->
vgId
,
pConsumerEp
->
consumerId
);
pConsumerEp
->
consumerId
);
}
}
imbCnt
++
;
imbCnt
++
;
...
@@ -311,7 +311,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
...
@@ -311,7 +311,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.
pVgEp
=
pVgEp
,
.
pVgEp
=
pVgEp
,
};
};
taosHashPut
(
pHash
,
&
pVgEp
->
vgId
,
sizeof
(
int32_t
),
&
outputVg
,
sizeof
(
SMqRebOutputVg
));
taosHashPut
(
pHash
,
&
pVgEp
->
vgId
,
sizeof
(
int32_t
),
&
outputVg
,
sizeof
(
SMqRebOutputVg
));
mInfo
(
"sub:%s mq rebalance remove vgId:%d from consumer:%"
PRIx64
",(first scan)"
,
sub
,
pVgEp
->
vgId
,
mInfo
(
"sub:%s mq rebalance remove vgId:%d from consumer:
0x
%"
PRIx64
",(first scan)"
,
sub
,
pVgEp
->
vgId
,
pConsumerEp
->
consumerId
);
pConsumerEp
->
consumerId
);
}
}
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录