Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9a5b7ce2
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9a5b7ce2
编写于
2月 21, 2023
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: do some internal refactor.
上级
466d5fe7
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
102 addition
and
84 deletion
+102
-84
include/common/tmsg.h
include/common/tmsg.h
+4
-4
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+34
-25
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+14
-15
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+49
-39
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+1
-1
未找到文件。
include/common/tmsg.h
浏览文件 @
9a5b7ce2
...
...
@@ -1830,10 +1830,10 @@ typedef struct {
}
SMqConsumerLostMsg
,
SMqConsumerRecoverMsg
,
SMqConsumerClearMsg
;
typedef
struct
{
int64_t
consumerId
;
char
cgroup
[
TSDB_CGROUP_LEN
];
char
clientId
[
256
];
SArray
*
topicNames
;
// SArray<char**>
u
int64_t
consumerId
;
char
cgroup
[
TSDB_CGROUP_LEN
];
char
clientId
[
256
];
SArray
*
topicNames
;
// SArray<char**>
}
SCMSubscribeReq
;
static
FORCE_INLINE
int32_t
tSerializeSCMSubscribeReq
(
void
**
buf
,
const
SCMSubscribeReq
*
pReq
)
{
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
9a5b7ce2
...
...
@@ -238,7 +238,9 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
// iterate all consumers, find all modification
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_CONSUMER
,
pIter
,
(
void
**
)
&
pConsumer
);
if
(
pIter
==
NULL
)
break
;
if
(
pIter
==
NULL
)
{
break
;
}
int32_t
hbStatus
=
atomic_add_fetch_32
(
&
pConsumer
->
hbStatus
,
1
);
int32_t
status
=
atomic_load_32
(
&
pConsumer
->
status
);
...
...
@@ -335,7 +337,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
int64_t
consumerId
=
req
.
consumerId
;
SMqConsumerObj
*
pConsumer
=
mndAcquireConsumer
(
pMnode
,
consumerId
);
if
(
pConsumer
==
NULL
)
{
mError
(
"consumer
%"
PRId
64
" not exist"
,
consumerId
);
mError
(
"consumer
:0x%"
PRIx
64
" not exist"
,
consumerId
);
terrno
=
TSDB_CODE_MND_CONSUMER_NOT_EXIST
;
return
-
1
;
}
...
...
@@ -345,7 +347,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
int32_t
status
=
atomic_load_32
(
&
pConsumer
->
status
);
if
(
status
==
MQ_CONSUMER_STATUS__LOST_REBD
)
{
mInfo
(
"try to recover consumer
%"
PRId
64
""
,
consumerId
);
mInfo
(
"try to recover consumer
:0x%"
PRIx
64
""
,
consumerId
);
SMqConsumerRecoverMsg
*
pRecoverMsg
=
rpcMallocCont
(
sizeof
(
SMqConsumerRecoverMsg
));
pRecoverMsg
->
consumerId
=
consumerId
;
...
...
@@ -390,7 +392,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
#if 1
if
(
status
==
MQ_CONSUMER_STATUS__LOST_REBD
)
{
mInfo
(
"try to recover consumer
%"
PRId
64
""
,
consumerId
);
mInfo
(
"try to recover consumer
:0x%"
PRIx
64
""
,
consumerId
);
SMqConsumerRecoverMsg
*
pRecoverMsg
=
rpcMallocCont
(
sizeof
(
SMqConsumerRecoverMsg
));
pRecoverMsg
->
consumerId
=
consumerId
;
...
...
@@ -404,7 +406,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) {
#endif
if
(
status
!=
MQ_CONSUMER_STATUS__READY
)
{
mInfo
(
"consumer
%"
PRId
64
" not ready, status: %s"
,
consumerId
,
mndConsumerStatusName
(
status
));
mInfo
(
"consumer
:0x%"
PRIx
64
" not ready, status: %s"
,
consumerId
,
mndConsumerStatusName
(
status
));
terrno
=
TSDB_CODE_MND_CONSUMER_NOT_READY
;
return
-
1
;
}
...
...
@@ -526,12 +528,14 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj
return
0
;
}
static
int32_t
mndProcessSubscribeReq
(
SRpcMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
info
.
node
;
char
*
msgStr
=
pMsg
->
pCont
;
int32_t
mndProcessSubscribeReq
(
SRpcMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
info
.
node
;
char
*
msgStr
=
pMsg
->
pCont
;
SCMSubscribeReq
subscribe
=
{
0
};
tDeserializeSCMSubscribeReq
(
msgStr
,
&
subscribe
);
int64_t
consumerId
=
subscribe
.
consumerId
;
uint64_t
consumerId
=
subscribe
.
consumerId
;
char
*
cgroup
=
subscribe
.
cgroup
;
SMqConsumerObj
*
pConsumerOld
=
NULL
;
SMqConsumerObj
*
pConsumerNew
=
NULL
;
...
...
@@ -542,21 +546,23 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
taosArrayRemoveDuplicateP
(
newSub
,
taosArrayCompareString
,
taosMemoryFree
);
int32_t
newTopicNum
=
taosArrayGetSize
(
newSub
);
// check topic existance
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_NOTHING
,
pMsg
,
"subscribe"
);
if
(
pTrans
==
NULL
)
goto
SUBSCRIBE_OVER
;
if
(
pTrans
==
NULL
)
{
goto
_over
;
}
for
(
int32_t
i
=
0
;
i
<
newTopicNum
;
i
++
)
{
char
*
topic
=
taosArrayGetP
(
newSub
,
i
);
SMqTopicObj
*
pTopic
=
mndAcquireTopic
(
pMnode
,
topic
);
if
(
pTopic
==
NULL
)
{
terrno
=
TSDB_CODE_MND_TOPIC_NOT_EXIST
;
goto
SUBSCRIBE_OVER
;
if
(
pTopic
==
NULL
)
{
// terrno has been set by callee function
goto
_over
;
}
if
(
mndCheckTopicPrivilege
(
pMnode
,
pMsg
->
info
.
conn
.
user
,
MND_OPER_SUBSCRIBE
,
pTopic
)
!=
0
)
{
mndReleaseTopic
(
pMnode
,
pTopic
);
goto
SUBSCRIBE_OVER
;
goto
_over
;
}
mndReleaseTopic
(
pMnode
,
pTopic
);
...
...
@@ -578,8 +584,8 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
taosArrayPush
(
pConsumerNew
->
assignedTopics
,
&
newTopicCopy
);
}
if
(
mndSetConsumerCommitLogs
(
pMnode
,
pTrans
,
pConsumerNew
)
!=
0
)
goto
SUBSCRIBE_OVER
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
SUBSCRIBE_OVER
;
if
(
mndSetConsumerCommitLogs
(
pMnode
,
pTrans
,
pConsumerNew
)
!=
0
)
goto
_over
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
_over
;
}
else
{
/*taosRLockLatch(&pConsumerOld->lock);*/
...
...
@@ -591,13 +597,13 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
if
(
status
!=
MQ_CONSUMER_STATUS__READY
)
{
terrno
=
TSDB_CODE_MND_CONSUMER_NOT_READY
;
goto
SUBSCRIBE_OVER
;
goto
_over
;
}
pConsumerNew
=
tNewSMqConsumerObj
(
consumerId
,
cgroup
);
if
(
pConsumerNew
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
SUBSCRIBE_OVER
;
goto
_over
;
}
pConsumerNew
->
updateType
=
CONSUMER_UPDATE__MODIFY
;
...
...
@@ -650,16 +656,16 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
/*if (taosArrayGetSize(pConsumerNew->assignedTopics) == 0) {*/
/*pConsumerNew->updateType = */
/*}*/
goto
SUBSCRIBE_OVER
;
goto
_over
;
}
if
(
mndSetConsumerCommitLogs
(
pMnode
,
pTrans
,
pConsumerNew
)
!=
0
)
goto
SUBSCRIBE_OVER
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
SUBSCRIBE_OVER
;
if
(
mndSetConsumerCommitLogs
(
pMnode
,
pTrans
,
pConsumerNew
)
!=
0
)
goto
_over
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
_over
;
}
code
=
TSDB_CODE_ACTION_IN_PROGRESS
;
SUBSCRIBE_OVER
:
_over
:
mndTransDrop
(
pTrans
);
if
(
pConsumerOld
)
{
...
...
@@ -971,16 +977,19 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
while
(
numOfRows
<
rowsCapacity
)
{
pShow
->
pIter
=
sdbFetch
(
pSdb
,
SDB_CONSUMER
,
pShow
->
pIter
,
(
void
**
)
&
pConsumer
);
if
(
pShow
->
pIter
==
NULL
)
break
;
if
(
pShow
->
pIter
==
NULL
)
{
break
;
}
if
(
taosArrayGetSize
(
pConsumer
->
assignedTopics
)
==
0
)
{
mDebug
(
"showing consumer
%"
PRId
64
" no assigned topic, skip"
,
pConsumer
->
consumerId
);
mDebug
(
"showing consumer
:0x%"
PRIx
64
" no assigned topic, skip"
,
pConsumer
->
consumerId
);
sdbRelease
(
pSdb
,
pConsumer
);
continue
;
}
taosRLockLatch
(
&
pConsumer
->
lock
);
mDebug
(
"showing consumer
%"
PRId
64
,
pConsumer
->
consumerId
);
mDebug
(
"showing consumer
:0x%"
PRIx
64
,
pConsumer
->
consumerId
);
int32_t
topicSz
=
taosArrayGetSize
(
pConsumer
->
assignedTopics
);
bool
hasTopic
=
true
;
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
9a5b7ce2
...
...
@@ -523,7 +523,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SVgObj
*
pVgroup
=
NULL
;
SQueryPlan
*
pPlan
=
NULL
;
SSubplan
*
plan
=
NULL
;
SSubplan
*
p
Subp
lan
=
NULL
;
if
(
pTopic
->
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
pPlan
=
qStringToQueryPlan
(
pTopic
->
physicalPlan
);
...
...
@@ -539,24 +539,27 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
return
-
1
;
}
SNodeListNode
*
inner
=
(
SNodeListNode
*
)
nodesListGetNode
(
pPlan
->
pSubplans
,
0
);
SNodeListNode
*
pNodeListNode
=
(
SNodeListNode
*
)
nodesListGetNode
(
pPlan
->
pSubplans
,
0
);
int32_t
opNum
=
LIST_LENGTH
(
inner
->
pNodeList
);
int32_t
opNum
=
LIST_LENGTH
(
pNodeListNode
->
pNodeList
);
if
(
opNum
!=
1
)
{
qDestroyQueryPlan
(
pPlan
);
terrno
=
TSDB_CODE_MND_INVALID_TOPIC_QUERY
;
return
-
1
;
}
plan
=
(
SSubplan
*
)
nodesListGetNode
(
inner
->
pNodeList
,
0
);
pSubplan
=
(
SSubplan
*
)
nodesListGetNode
(
pNodeListNode
->
pNodeList
,
0
);
}
ASSERT
(
pSub
->
unassignedVgs
);
ASSERT
(
taosHashGetSize
(
pSub
->
consumerHash
)
==
0
);
void
*
pIter
=
NULL
;
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pIter
==
NULL
)
break
;
if
(
pIter
==
NULL
)
{
break
;
}
if
(
!
mndVgroupInDb
(
pVgroup
,
pTopic
->
dbUid
))
{
sdbRelease
(
pSdb
,
pVgroup
);
continue
;
...
...
@@ -569,15 +572,15 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
pVgEp
->
vgId
=
pVgroup
->
vgId
;
taosArrayPush
(
pSub
->
unassignedVgs
,
&
pVgEp
);
mDebug
(
"init subscription %s
, assign vg: %d"
,
pSub
->
key
,
pVgEp
->
vgId
);
mDebug
(
"init subscription %s
for topic:%s assign vgId:%d"
,
pSub
->
key
,
pTopic
->
name
,
pVgEp
->
vgId
);
if
(
pTopic
->
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
int32_t
msgLen
;
plan
->
execNode
.
epSet
=
pVgEp
->
epSet
;
plan
->
execNode
.
nodeId
=
pVgEp
->
vgId
;
p
Subp
lan
->
execNode
.
epSet
=
pVgEp
->
epSet
;
p
Subp
lan
->
execNode
.
nodeId
=
pVgEp
->
vgId
;
if
(
qSubPlanToString
(
plan
,
&
pVgEp
->
qmsg
,
&
msgLen
)
<
0
)
{
if
(
qSubPlanToString
(
p
Subp
lan
,
&
pVgEp
->
qmsg
,
&
msgLen
)
<
0
)
{
sdbRelease
(
pSdb
,
pVgroup
);
qDestroyQueryPlan
(
pPlan
);
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
...
...
@@ -590,11 +593,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
sdbRelease
(
pSdb
,
pVgroup
);
}
ASSERT
(
pSub
->
unassignedVgs
->
size
>
0
);
ASSERT
(
taosHashGetSize
(
pSub
->
consumerHash
)
==
0
);
ASSERT
(
taosArrayGetSize
(
pSub
->
unassignedVgs
)
>
0
);
qDestroyQueryPlan
(
pPlan
);
return
0
;
}
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
9a5b7ce2
...
...
@@ -39,12 +39,10 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
static
int32_t
mndSubActionInsert
(
SSdb
*
pSdb
,
SMqSubscribeObj
*
);
static
int32_t
mndSubActionDelete
(
SSdb
*
pSdb
,
SMqSubscribeObj
*
);
static
int32_t
mndSubActionUpdate
(
SSdb
*
pSdb
,
SMqSubscribeObj
*
pOldSub
,
SMqSubscribeObj
*
pNewSub
);
static
int32_t
mndProcessRebalanceReq
(
SRpcMsg
*
pMsg
);
static
int32_t
mndProcessDropCgroupReq
(
SRpcMsg
*
pMsg
);
static
int32_t
mndRetrieveSubscribe
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextSubscribe
(
SMnode
*
pMnode
,
void
*
pIter
);
static
int32_t
mndProcessRebalanceReq
(
SRpcMsg
*
pMsg
);
static
int32_t
mndProcessDropCgroupReq
(
SRpcMsg
*
pMsg
);
static
int32_t
mndRetrieveSubscribe
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextSubscribe
(
SMnode
*
pMnode
,
void
*
pIter
);
static
int32_t
mndSetSubRedoLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SMqSubscribeObj
*
pSub
)
{
SSdbRaw
*
pRedoRaw
=
mndSubActionEncode
(
pSub
);
...
...
@@ -85,12 +83,13 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
return
sdbSetTable
(
pMnode
->
pSdb
,
table
);
}
static
SMqSubscribeObj
*
mndCreateSub
(
SMnode
*
pMnode
,
const
SMqTopicObj
*
pTopic
,
const
char
*
subKey
)
{
static
SMqSubscribeObj
*
mndCreateSub
scription
(
SMnode
*
pMnode
,
const
SMqTopicObj
*
pTopic
,
const
char
*
subKey
)
{
SMqSubscribeObj
*
pSub
=
tNewSubscribeObj
(
subKey
);
if
(
pSub
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
pSub
->
dbUid
=
pTopic
->
dbUid
;
pSub
->
stbUid
=
pTopic
->
stbUid
;
pSub
->
subType
=
pTopic
->
subType
;
...
...
@@ -205,7 +204,7 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
static
int32_t
mndDoRebalance
(
SMnode
*
pMnode
,
const
SMqRebInputObj
*
pInput
,
SMqRebOutputObj
*
pOutput
)
{
int32_t
totalVgNum
=
pOutput
->
pSub
->
vgNum
;
const
char
*
sub
=
pOutput
->
pSub
->
key
;
mInfo
(
"sub:%s
, mq rebalance vgNum:%d
"
,
sub
,
pOutput
->
pSub
->
vgNum
);
mInfo
(
"sub:%s
mq re-balance %d vgroups
"
,
sub
,
pOutput
->
pSub
->
vgNum
);
// 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg
SHashObj
*
pHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
false
,
HASH_NO_LOCK
);
...
...
@@ -214,7 +213,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
int32_t
removedNum
=
taosArrayGetSize
(
pInput
->
pRebInfo
->
removedConsumers
);
int32_t
actualRemoved
=
0
;
for
(
int32_t
i
=
0
;
i
<
removedNum
;
i
++
)
{
int64_t
consumerId
=
*
(
int64_t
*
)
taosArrayGet
(
pInput
->
pRebInfo
->
removedConsumers
,
i
);
uint64_t
consumerId
=
*
(
u
int64_t
*
)
taosArrayGet
(
pInput
->
pRebInfo
->
removedConsumers
,
i
);
SMqConsumerEp
*
pConsumerEp
=
taosHashGet
(
pOutput
->
pSub
->
consumerHash
,
&
consumerId
,
sizeof
(
int64_t
));
...
...
@@ -229,7 +228,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.
pVgEp
=
pVgEp
,
};
taosHashPut
(
pHash
,
&
pVgEp
->
vgId
,
sizeof
(
int32_t
),
&
outputVg
,
sizeof
(
SMqRebOutputVg
));
mInfo
(
"sub:%s
, mq re
balance remove vgId:%d from consumer:%"
PRId64
,
sub
,
pVgEp
->
vgId
,
consumerId
);
mInfo
(
"sub:%s
mq re-
balance remove vgId:%d from consumer:%"
PRId64
,
sub
,
pVgEp
->
vgId
,
consumerId
);
}
taosArrayDestroy
(
pConsumerEp
->
vgs
);
taosHashRemove
(
pOutput
->
pSub
->
consumerHash
,
&
consumerId
,
sizeof
(
int64_t
));
...
...
@@ -239,7 +238,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
}
if
(
removedNum
!=
actualRemoved
)
{
mError
(
"sub:%s
, mq re
balance removedNum:%d not matched with actual:%d"
,
sub
,
removedNum
,
actualRemoved
);
mError
(
"sub:%s
mq re-
balance removedNum:%d not matched with actual:%d"
,
sub
,
removedNum
,
actualRemoved
);
}
// if previously no consumer, there are vgs not assigned
...
...
@@ -253,7 +252,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.
pVgEp
=
pVgEp
,
};
taosHashPut
(
pHash
,
&
pVgEp
->
vgId
,
sizeof
(
int32_t
),
&
rebOutput
,
sizeof
(
SMqRebOutputVg
));
mInfo
(
"sub:%s
, mq re
balance remove vgId:%d from unassigned"
,
sub
,
pVgEp
->
vgId
);
mInfo
(
"sub:%s
mq re-
balance remove vgId:%d from unassigned"
,
sub
,
pVgEp
->
vgId
);
}
}
...
...
@@ -267,7 +266,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
minVgCnt
=
totalVgNum
/
afterRebConsumerNum
;
imbConsumerNum
=
totalVgNum
%
afterRebConsumerNum
;
}
mInfo
(
"sub:%s, mq rebalance %d consumer after rebalance, at least %d vg each, %d consumer has more vg"
,
sub
,
mInfo
(
"sub:%s mq re-balance %d consumers: at least %d vg each, %d consumer has more vg"
,
sub
,
afterRebConsumerNum
,
minVgCnt
,
imbConsumerNum
);
// 4. first scan: remove consumer more than wanted, put to remove hash
...
...
@@ -275,7 +275,10 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
void
*
pIter
=
NULL
;
while
(
1
)
{
pIter
=
taosHashIterate
(
pOutput
->
pSub
->
consumerHash
,
pIter
);
if
(
pIter
==
NULL
)
break
;
if
(
pIter
==
NULL
)
{
break
;
}
SMqConsumerEp
*
pConsumerEp
=
(
SMqConsumerEp
*
)
pIter
;
int32_t
consumerVgNum
=
taosArrayGetSize
(
pConsumerEp
->
vgs
);
...
...
@@ -297,7 +300,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.
pVgEp
=
pVgEp
,
};
taosHashPut
(
pHash
,
&
pVgEp
->
vgId
,
sizeof
(
int32_t
),
&
outputVg
,
sizeof
(
SMqRebOutputVg
));
mInfo
(
"sub:%s
,
mq rebalance remove vgId:%d from consumer:%"
PRId64
",(first scan)"
,
sub
,
pVgEp
->
vgId
,
mInfo
(
"sub:%s mq rebalance remove vgId:%d from consumer:%"
PRId64
",(first scan)"
,
sub
,
pVgEp
->
vgId
,
pConsumerEp
->
consumerId
);
}
imbCnt
++
;
...
...
@@ -312,7 +315,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.
pVgEp
=
pVgEp
,
};
taosHashPut
(
pHash
,
&
pVgEp
->
vgId
,
sizeof
(
int32_t
),
&
outputVg
,
sizeof
(
SMqRebOutputVg
));
mInfo
(
"sub:%s
,
mq rebalance remove vgId:%d from consumer:%"
PRId64
",(first scan)"
,
sub
,
pVgEp
->
vgId
,
mInfo
(
"sub:%s mq rebalance remove vgId:%d from consumer:%"
PRId64
",(first scan)"
,
sub
,
pVgEp
->
vgId
,
pConsumerEp
->
consumerId
);
}
}
...
...
@@ -330,7 +333,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
newConsumerEp
.
vgs
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
taosHashPut
(
pOutput
->
pSub
->
consumerHash
,
&
consumerId
,
sizeof
(
int64_t
),
&
newConsumerEp
,
sizeof
(
SMqConsumerEp
));
taosArrayPush
(
pOutput
->
newConsumers
,
&
consumerId
);
mInfo
(
"sub:%s
,
mq rebalance add new consumer:%"
PRId64
,
sub
,
consumerId
);
mInfo
(
"sub:%s mq rebalance add new consumer:%"
PRId64
,
sub
,
consumerId
);
}
}
...
...
@@ -349,7 +352,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
// iter hash and find one vg
pRemovedIter
=
taosHashIterate
(
pHash
,
pRemovedIter
);
if
(
pRemovedIter
==
NULL
)
{
mError
(
"sub:%s
,
removed iter is null"
,
sub
);
mError
(
"sub:%s removed iter is null"
,
sub
);
continue
;
}
...
...
@@ -402,33 +405,36 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
SMqRebOutputVg
*
pRebOutput
=
NULL
;
while
(
1
)
{
pIter
=
taosHashIterate
(
pHash
,
pIter
);
if
(
pIter
==
NULL
)
break
;
if
(
pIter
==
NULL
)
{
break
;
}
pRebOutput
=
(
SMqRebOutputVg
*
)
pIter
;
taosArrayPush
(
pOutput
->
pSub
->
unassignedVgs
,
&
pRebOutput
->
pVgEp
);
taosArrayPush
(
pOutput
->
rebVgs
,
pRebOutput
);
mInfo
(
"sub:%s
, mq re
balance unassign vgId:%d (second scan)"
,
sub
,
pRebOutput
->
pVgEp
->
vgId
);
mInfo
(
"sub:%s
mq re-
balance unassign vgId:%d (second scan)"
,
sub
,
pRebOutput
->
pVgEp
->
vgId
);
}
}
// 8. generate logs
mInfo
(
"sub:%s
, mq rebalance calculation completed, re
balanced vg"
,
sub
);
mInfo
(
"sub:%s
mq re-balance calculation completed, re-
balanced vg"
,
sub
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pOutput
->
rebVgs
);
i
++
)
{
SMqRebOutputVg
*
pOutputRebVg
=
taosArrayGet
(
pOutput
->
rebVgs
,
i
);
mInfo
(
"sub:%s
, mq rebalance vgId:%d, moved from consumer:%"
PRId64
", to consumer:%"
PRId
64
,
sub
,
mInfo
(
"sub:%s
mq re-balance vgId:%d, moved from consumer:0x%"
PRIx64
", to consumer:0x%"
PRIx
64
,
sub
,
pOutputRebVg
->
pVgEp
->
vgId
,
pOutputRebVg
->
oldConsumerId
,
pOutputRebVg
->
newConsumerId
);
}
{
void
*
pIter
=
NULL
;
pIter
=
NULL
;
while
(
1
)
{
pIter
=
taosHashIterate
(
pOutput
->
pSub
->
consumerHash
,
pIter
);
if
(
pIter
==
NULL
)
break
;
SMqConsumerEp
*
pConsumerEp
=
(
SMqConsumerEp
*
)
pIter
;
int32_t
sz
=
taosArrayGetSize
(
pConsumerEp
->
vgs
);
mInfo
(
"sub:%s
, mq rebalance final cfg: consumer
%"
PRId64
" has %d vg"
,
sub
,
pConsumerEp
->
consumerId
,
sz
);
mInfo
(
"sub:%s
mq re-balance final cfg: consumer:0x
%"
PRId64
" has %d vg"
,
sub
,
pConsumerEp
->
consumerId
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqVgEp
*
pVgEp
=
taosArrayGetP
(
pConsumerEp
->
vgs
,
i
);
mInfo
(
"sub:%s
, mq rebalance final cfg: vg %d to consumer %"
PRId64
""
,
sub
,
pVgEp
->
vgId
,
mInfo
(
"sub:%s
mq re-balance final cfg: vg %d to consumer:0x%"
PRId64
,
sub
,
pVgEp
->
vgId
,
pConsumerEp
->
consumerId
);
}
}
...
...
@@ -552,11 +558,14 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
SMqDoRebalanceMsg
*
pReq
=
pMsg
->
pCont
;
void
*
pIter
=
NULL
;
mInfo
(
"mq rebalance start"
);
mInfo
(
"mq re
-
balance start"
);
while
(
1
)
{
pIter
=
taosHashIterate
(
pReq
->
rebSubHash
,
pIter
);
if
(
pIter
==
NULL
)
break
;
if
(
pIter
==
NULL
)
{
break
;
}
SMqRebInputObj
rebInput
=
{
0
};
SMqRebOutputObj
rebOutput
=
{
0
};
...
...
@@ -577,12 +586,13 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
mndSplitSubscribeKey
(
pRebInfo
->
key
,
topic
,
cgroup
,
true
);
SMqTopicObj
*
pTopic
=
mndAcquireTopic
(
pMnode
,
topic
);
if
(
pTopic
==
NULL
)
{
mError
(
"mq re
balance %s failed since topic %s not exist, abor
t"
,
pRebInfo
->
key
,
topic
);
mError
(
"mq re
-balance %s ignored since topic %s not exis
t"
,
pRebInfo
->
key
,
topic
);
continue
;
}
taosRLockLatch
(
&
pTopic
->
lock
);
rebOutput
.
pSub
=
mndCreateSub
(
pMnode
,
pTopic
,
pRebInfo
->
key
);
rebOutput
.
pSub
=
mndCreateSub
scription
(
pMnode
,
pTopic
,
pRebInfo
->
key
);
if
(
rebOutput
.
pSub
==
NULL
)
{
mError
(
"mq rebalance %s failed create sub since %s, abort"
,
pRebInfo
->
key
,
terrstr
());
...
...
@@ -605,15 +615,16 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
}
if
(
mndDoRebalance
(
pMnode
,
&
rebInput
,
&
rebOutput
)
<
0
)
{
mError
(
"mq rebalance internal error"
);
mError
(
"mq re
-
balance internal error"
);
}
// if add more consumer to balanced subscribe,
// possibly no vg is changed
if
(
mndPersistRebResult
(
pMnode
,
pMsg
,
&
rebOutput
)
<
0
)
{
mError
(
"mq re
balance persist re
balance output error, possibly vnode splitted or dropped"
);
mError
(
"mq re
-balance persist re-
balance output error, possibly vnode splitted or dropped"
);
}
taosArrayDestroy
(
pRebInfo
->
lostConsumers
);
taosArrayDestroy
(
pRebInfo
->
newConsumers
);
taosArrayDestroy
(
pRebInfo
->
removedConsumers
);
...
...
@@ -627,19 +638,18 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
}
// reset flag
mInfo
(
"mq rebalance completed successfully"
);
mInfo
(
"mq re
-
balance completed successfully"
);
taosHashCleanup
(
pReq
->
rebSubHash
);
mndRebEnd
();
return
0
;
}
static
int32_t
mndProcessDropCgroupReq
(
SRpcMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
info
.
node
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
static
int32_t
mndProcessDropCgroupReq
(
SRpcMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
info
.
node
;
SMDropCgroupReq
dropReq
=
{
0
};
if
(
tDeserializeSMDropCgroupReq
(
p
Req
->
pCont
,
pReq
->
contLen
,
&
dropReq
)
!=
0
)
{
if
(
tDeserializeSMDropCgroupReq
(
p
Msg
->
pCont
,
pMsg
->
contLen
,
&
dropReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
return
-
1
;
}
...
...
@@ -663,7 +673,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pReq) {
return
-
1
;
}
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_CONFLICT_NOTHING
,
p
Req
,
"drop-cgroup"
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_CONFLICT_NOTHING
,
p
Msg
,
"drop-cgroup"
);
if
(
pTrans
==
NULL
)
{
mError
(
"cgroup: %s on topic:%s, failed to drop since %s"
,
dropReq
.
cgroup
,
dropReq
.
topic
,
terrstr
());
mndReleaseSubscribe
(
pMnode
,
pSub
);
...
...
@@ -956,7 +966,7 @@ END:
return
code
;
}
static
int32_t
mndRetrieveSubscribe
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rowsCapacity
)
{
int32_t
mndRetrieveSubscribe
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rowsCapacity
)
{
SMnode
*
pMnode
=
pReq
->
info
.
node
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
numOfRows
=
0
;
...
...
@@ -1090,7 +1100,7 @@ static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
return
numOfRows
;
}
static
void
mndCancelGetNextSubscribe
(
SMnode
*
pMnode
,
void
*
pIter
)
{
void
mndCancelGetNextSubscribe
(
SMnode
*
pMnode
,
void
*
pIter
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbCancelFetch
(
pSdb
,
pIter
);
}
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
9a5b7ce2
...
...
@@ -33,7 +33,7 @@
static
int32_t
mndTopicActionInsert
(
SSdb
*
pSdb
,
SMqTopicObj
*
pTopic
);
static
int32_t
mndTopicActionDelete
(
SSdb
*
pSdb
,
SMqTopicObj
*
pTopic
);
static
int32_t
mndTopicActionUpdate
(
SSdb
*
pSdb
,
SMqTopicObj
*
pTopic
,
SMqTopicObj
*
pNewTopic
);
static
int32_t
mndTopicActionUpdate
(
SSdb
*
pSdb
,
SMqTopicObj
*
p
Old
Topic
,
SMqTopicObj
*
pNewTopic
);
static
int32_t
mndProcessCreateTopicReq
(
SRpcMsg
*
pReq
);
static
int32_t
mndProcessDropTopicReq
(
SRpcMsg
*
pReq
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录