Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f0d7d1b3
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f0d7d1b3
编写于
2月 11, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
mq rebalance
上级
42da4f6f
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
148 addition
and
32 deletion
+148
-32
example/src/tmq.c
example/src/tmq.c
+1
-1
include/common/tmsg.h
include/common/tmsg.h
+57
-5
include/common/tmsgdef.h
include/common/tmsgdef.h
+1
-0
source/client/src/tmq.c
source/client/src/tmq.c
+2
-1
source/dnode/mgmt/impl/src/dndTransport.c
source/dnode/mgmt/impl/src/dndTransport.c
+2
-0
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+33
-4
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+1
-1
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+31
-19
source/dnode/mnode/impl/src/mnode.c
source/dnode/mnode/impl/src/mnode.c
+1
-1
source/dnode/vnode/inc/tq.h
source/dnode/vnode/inc/tq.h
+1
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+13
-0
source/dnode/vnode/src/vnd/vnodeWrite.c
source/dnode/vnode/src/vnd/vnodeWrite.c
+5
-0
未找到文件。
example/src/tmq.c
浏览文件 @
f0d7d1b3
...
...
@@ -116,7 +116,7 @@ void basic_consume_loop(tmq_t *tmq,
int32_t
cnt
=
0
;
/*clock_t startTime = clock();*/
while
(
running
)
{
tmq_message_t
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
0
);
tmq_message_t
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
50
0
);
if
(
tmqmessage
)
{
cnt
++
;
msg_process
(
tmqmessage
);
...
...
include/common/tmsg.h
浏览文件 @
f0d7d1b3
...
...
@@ -1646,7 +1646,7 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
return
buf
;
}
typedef
struct
SMqSetCVgReq
{
typedef
struct
{
int64_t
leftForVer
;
int32_t
vgId
;
int64_t
oldConsumerId
;
...
...
@@ -1711,7 +1711,51 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
return
buf
;
}
typedef
struct
SMqSetCVgRsp
{
typedef
struct
{
int64_t
leftForVer
;
int32_t
vgId
;
int64_t
oldConsumerId
;
int64_t
newConsumerId
;
//char topicName[TSDB_TOPIC_FNAME_LEN];
//char cgroup[TSDB_CONSUMER_GROUP_LEN];
//char* sql;
//char* logicalPlan;
//char* physicalPlan;
//char* qmsg;
}
SMqMVRebReq
;
static
FORCE_INLINE
int32_t
tEncodeSMqMVRebReq
(
void
**
buf
,
const
SMqMVRebReq
*
pReq
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
leftForVer
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
vgId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
oldConsumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
newConsumerId
);
//tlen += taosEncodeString(buf, pReq->topicName);
//tlen += taosEncodeString(buf, pReq->cgroup);
//tlen += taosEncodeString(buf, pReq->sql);
//tlen += taosEncodeString(buf, pReq->logicalPlan);
//tlen += taosEncodeString(buf, pReq->physicalPlan);
//tlen += taosEncodeString(buf, pReq->qmsg);
//tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSMqMVRebReq
(
void
*
buf
,
SMqMVRebReq
*
pReq
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
leftForVer
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
vgId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
oldConsumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
newConsumerId
);
//buf = taosDecodeStringTo(buf, pReq->topicName);
//buf = taosDecodeStringTo(buf, pReq->cgroup);
//buf = taosDecodeString(buf, &pReq->sql);
//buf = taosDecodeString(buf, &pReq->logicalPlan);
//buf = taosDecodeString(buf, &pReq->physicalPlan);
//buf = taosDecodeString(buf, &pReq->qmsg);
//buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
return
buf
;
}
typedef
struct
{
SMsgHead
header
;
int32_t
vgId
;
int64_t
consumerId
;
...
...
@@ -1719,6 +1763,14 @@ typedef struct SMqSetCVgRsp {
char
cGroup
[
TSDB_CONSUMER_GROUP_LEN
];
}
SMqSetCVgRsp
;
typedef
struct
{
SMsgHead
header
;
int32_t
vgId
;
int64_t
consumerId
;
char
topicName
[
TSDB_TOPIC_FNAME_LEN
];
char
cGroup
[
TSDB_CONSUMER_GROUP_LEN
];
}
SMqMVRebRsp
;
typedef
struct
{
uint32_t
nCols
;
SSchema
*
pSchema
;
...
...
@@ -1819,7 +1871,7 @@ typedef struct {
typedef
struct
{
int64_t
consumerId
;
int
64
_t
epoch
;
int
32
_t
epoch
;
char
cgroup
[
TSDB_CONSUMER_GROUP_LEN
];
SArray
*
topics
;
// SArray<SMqSubTopicEp>
}
SMqCMGetSubEpRsp
;
...
...
@@ -1876,7 +1928,7 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE
static
FORCE_INLINE
int32_t
tEncodeSMqCMGetSubEpRsp
(
void
**
buf
,
const
SMqCMGetSubEpRsp
*
pRsp
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
consumerId
);
tlen
+=
taosEncodeFixedI
64
(
buf
,
pRsp
->
epoch
);
tlen
+=
taosEncodeFixedI
32
(
buf
,
pRsp
->
epoch
);
tlen
+=
taosEncodeString
(
buf
,
pRsp
->
cgroup
);
int32_t
sz
=
taosArrayGetSize
(
pRsp
->
topics
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
...
...
@@ -1889,7 +1941,7 @@ static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSu
static
FORCE_INLINE
void
*
tDecodeSMqCMGetSubEpRsp
(
void
*
buf
,
SMqCMGetSubEpRsp
*
pRsp
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
consumerId
);
buf
=
taosDecodeFixedI
64
(
buf
,
&
pRsp
->
epoch
);
buf
=
taosDecodeFixedI
32
(
buf
,
&
pRsp
->
epoch
);
buf
=
taosDecodeStringTo
(
buf
,
pRsp
->
cgroup
);
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
...
...
include/common/tmsgdef.h
浏览文件 @
f0d7d1b3
...
...
@@ -163,6 +163,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_CONNECT
,
"vnode-mq-connect"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_DISCONNECT
,
"vnode-mq-disconnect"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_SET_CONN
,
"vnode-mq-set-conn"
,
SMqSetCVgReq
,
SMqSetCVgRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_REB
,
"vnode-mq-mv-rebalance"
,
SMqMVRebReq
,
SMqMVRebRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_SET_CUR
,
"vnode-mq-set-cur"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_RES_READY
,
"vnode-res-ready"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TASKS_STATUS
,
"vnode-tasks-status"
,
NULL
,
NULL
)
...
...
source/client/src/tmq.c
浏览文件 @
f0d7d1b3
...
...
@@ -58,7 +58,7 @@ struct tmq_t {
char
clientId
[
256
];
SRWLatch
lock
;
int64_t
consumerId
;
int
64
_t
epoch
;
int
32
_t
epoch
;
int64_t
status
;
tsem_t
rspSem
;
STscObj
*
pTscObj
;
...
...
@@ -592,6 +592,7 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) {
goto
END
;
}
buf
->
consumerId
=
htobe64
(
tmq
->
consumerId
);
buf
->
epoch
=
htonl
(
tmq
->
epoch
);
strcpy
(
buf
->
cgroup
,
tmq
->
groupId
);
SRequestObj
*
pRequest
=
createRequest
(
tmq
->
pTscObj
,
NULL
,
NULL
,
TDMT_MND_GET_SUB_EP
);
...
...
source/dnode/mgmt/impl/src/dndTransport.c
浏览文件 @
f0d7d1b3
...
...
@@ -115,6 +115,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_SUBSCRIBE
)]
=
dndProcessMnodeWriteMsg
;
/*pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;*/
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_MQ_SET_CONN_RSP
)]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_MQ_REB_RSP
)]
=
dndProcessMnodeWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_GET_SUB_EP
)]
=
dndProcessMnodeReadMsg
;
// Requests handled by VNODE
...
...
@@ -148,6 +149,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_SHOW_TABLES
)]
=
dndProcessVnodeFetchMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_SHOW_TABLES_FETCH
)]
=
dndProcessVnodeFetchMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_MQ_SET_CONN
)]
=
dndProcessVnodeWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_MQ_REB
)]
=
dndProcessVnodeWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_MQ_SET_CUR
)]
=
dndProcessVnodeFetchMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_CONSUME
)]
=
dndProcessVnodeFetchMsg
;
}
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
f0d7d1b3
...
...
@@ -421,6 +421,7 @@ typedef struct {
int32_t
status
;
int32_t
vgNum
;
SArray
*
consumers
;
// SArray<SMqSubConsumer>
SArray
*
lostConsumers
;
// SArray<SMqSubConsumer>
SArray
*
unassignedVg
;
// SArray<SMqConsumerEp>
}
SMqSubscribeObj
;
...
...
@@ -429,10 +430,17 @@ static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() {
if
(
pSub
==
NULL
)
{
return
NULL
;
}
pSub
->
consumers
=
taosArrayInit
(
0
,
sizeof
(
SMqSubConsumer
));
if
(
pSub
->
consumers
==
NULL
)
{
goto
_err
;
}
pSub
->
lostConsumers
=
taosArrayInit
(
0
,
sizeof
(
SMqSubConsumer
));
if
(
pSub
->
lostConsumers
==
NULL
)
{
goto
_err
;
}
pSub
->
unassignedVg
=
taosArrayInit
(
0
,
sizeof
(
SMqConsumerEp
));
if
(
pSub
->
unassignedVg
==
NULL
)
{
goto
_err
;
...
...
@@ -445,8 +453,9 @@ static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() {
return
pSub
;
_err:
tfree
(
pSub
->
unassignedVg
);
tfree
(
pSub
->
consumers
);
tfree
(
pSub
->
lostConsumers
);
tfree
(
pSub
->
unassignedVg
);
tfree
(
pSub
);
return
NULL
;
}
...
...
@@ -465,6 +474,13 @@ static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeOb
tlen
+=
tEncodeSMqSubConsumer
(
buf
,
pSubConsumer
);
}
sz
=
taosArrayGetSize
(
pSub
->
lostConsumers
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqSubConsumer
*
pSubConsumer
=
taosArrayGet
(
pSub
->
lostConsumers
,
i
);
tlen
+=
tEncodeSMqSubConsumer
(
buf
,
pSubConsumer
);
}
sz
=
taosArrayGetSize
(
pSub
->
unassignedVg
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
...
...
@@ -493,6 +509,17 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
taosArrayPush
(
pSub
->
consumers
,
&
subConsumer
);
}
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pSub
->
lostConsumers
=
taosArrayInit
(
sz
,
sizeof
(
SMqSubConsumer
));
if
(
pSub
->
lostConsumers
==
NULL
)
{
return
NULL
;
}
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqSubConsumer
subConsumer
=
{
0
};
buf
=
tDecodeSMqSubConsumer
(
buf
,
&
subConsumer
);
taosArrayPush
(
pSub
->
lostConsumers
,
&
subConsumer
);
}
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pSub
->
unassignedVg
=
taosArrayInit
(
sz
,
sizeof
(
SMqConsumerEp
));
if
(
pSub
->
unassignedVg
==
NULL
)
{
...
...
@@ -542,7 +569,7 @@ typedef struct {
char
cgroup
[
TSDB_CONSUMER_GROUP_LEN
];
SArray
*
currentTopics
;
// SArray<char*>
SArray
*
recentRemovedTopics
;
// SArray<char*>
int
64
_t
epoch
;
int
32
_t
epoch
;
// stat
int64_t
pollCnt
;
// status
...
...
@@ -558,8 +585,9 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerO
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumer
->
consumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumer
->
connId
);
tlen
+=
taosEncodeFixedI
64
(
buf
,
pConsumer
->
epoch
);
tlen
+=
taosEncodeFixedI
32
(
buf
,
pConsumer
->
epoch
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumer
->
pollCnt
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumer
->
status
);
tlen
+=
taosEncodeString
(
buf
,
pConsumer
->
cgroup
);
sz
=
taosArrayGetSize
(
pConsumer
->
currentTopics
);
...
...
@@ -582,8 +610,9 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons
int32_t
sz
;
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
consumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
connId
);
buf
=
taosDecodeFixedI
64
(
buf
,
&
pConsumer
->
epoch
);
buf
=
taosDecodeFixedI
32
(
buf
,
&
pConsumer
->
epoch
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
pollCnt
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumer
->
status
);
buf
=
taosDecodeStringTo
(
buf
,
pConsumer
->
cgroup
);
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
f0d7d1b3
...
...
@@ -61,7 +61,7 @@ SMqConsumerObj* mndCreateConsumer(int64_t consumerId, const char* cgroup) {
}
pConsumer
->
epoch
=
1
;
pConsumer
->
consumerId
=
consumerId
;
pConsumer
->
status
=
MQ_CONSUMER_STATUS__INIT
;
atomic_store_32
(
&
pConsumer
->
status
,
MQ_CONSUMER_STATUS__INIT
)
;
strcpy
(
pConsumer
->
cgroup
,
cgroup
);
taosInitRWLatch
(
&
pConsumer
->
lock
);
return
pConsumer
;
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
f0d7d1b3
...
...
@@ -72,6 +72,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
mndSetMsgHandle
(
pMnode
,
TDMT_MND_SUBSCRIBE
,
mndProcessSubscribeReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_MQ_SET_CONN_RSP
,
mndProcessSubscribeInternalRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_VND_MQ_REB_RSP
,
mndProcessSubscribeInternalRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_MQ_TIMER
,
mndProcessMqTimerMsg
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_GET_SUB_EP
,
mndProcessGetSubEpReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_MQ_DO_REBALANCE
,
mndProcessDoRebalanceMsg
);
...
...
@@ -105,13 +106,13 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj
}
static
int32_t
mndBuildRebalanceMsg
(
void
**
pBuf
,
int32_t
*
pLen
,
const
SMqConsumerEp
*
pConsumerEp
)
{
SMq
SetCVg
Req
req
=
{
SMq
MVReb
Req
req
=
{
.
vgId
=
pConsumerEp
->
vgId
,
.
oldConsumerId
=
pConsumerEp
->
oldConsumerId
,
.
newConsumerId
=
pConsumerEp
->
consumerId
,
};
int32_t
tlen
=
tEncodeSMq
SetCVg
Req
(
NULL
,
&
req
);
int32_t
tlen
=
tEncodeSMq
MVReb
Req
(
NULL
,
&
req
);
void
*
buf
=
malloc
(
sizeof
(
SMsgHead
)
+
tlen
);
if
(
buf
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -123,7 +124,7 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume
pMsgHead
->
vgId
=
htonl
(
pConsumerEp
->
vgId
);
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
tEncodeSMq
SetCVg
Req
(
&
abuf
,
&
req
);
tEncodeSMq
MVReb
Req
(
&
abuf
,
&
req
);
*
pBuf
=
buf
;
*
pLen
=
tlen
;
...
...
@@ -208,6 +209,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
SMqCMGetSubEpReq
*
pReq
=
(
SMqCMGetSubEpReq
*
)
pMsg
->
rpcMsg
.
pCont
;
SMqCMGetSubEpRsp
rsp
=
{
0
};
int64_t
consumerId
=
be64toh
(
pReq
->
consumerId
);
int32_t
epoch
=
ntohl
(
pReq
->
epoch
);
SMqConsumerObj
*
pConsumer
=
mndAcquireConsumer
(
pMsg
->
pMnode
,
consumerId
);
if
(
pConsumer
==
NULL
)
{
...
...
@@ -216,10 +218,19 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
}
ASSERT
(
strcmp
(
pReq
->
cgroup
,
pConsumer
->
cgroup
)
==
0
);
//TODO
int32_t
hbStatus
=
atomic_load_32
(
&
pConsumer
->
hbStatus
);
mInfo
(
"try to get sub ep, old val: %d"
,
hbStatus
);
atomic_store_32
(
&
pConsumer
->
hbStatus
,
0
);
/*SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);*/
/*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/
/*sdbWrite(pMnode->pSdb, pConsumerRaw);*/
strcpy
(
rsp
.
cgroup
,
pReq
->
cgroup
);
rsp
.
consumerId
=
consumerId
;
rsp
.
epoch
=
pConsumer
->
epoch
;
if
(
pReq
->
epoch
!=
rsp
.
epoch
)
{
if
(
epoch
!=
rsp
.
epoch
)
{
mInfo
(
"old epoch %d, new epoch %d"
,
epoch
,
rsp
.
epoch
);
SArray
*
pTopics
=
pConsumer
->
currentTopics
;
int
sz
=
taosArrayGetSize
(
pTopics
);
rsp
.
topics
=
taosArrayInit
(
sz
,
sizeof
(
SMqSubTopicEp
));
...
...
@@ -258,6 +269,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
void
*
abuf
=
buf
;
tEncodeSMqCMGetSubEpRsp
(
&
abuf
,
&
rsp
);
tDeleteSMqCMGetSubEpRsp
(
&
rsp
);
mndReleaseConsumer
(
pMnode
,
pConsumer
);
pMsg
->
pCont
=
buf
;
pMsg
->
contLen
=
tlen
;
return
0
;
...
...
@@ -373,9 +385,10 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
mInfo
(
"mq remove lost consumer %ld"
,
lostConsumerId
);
for
(
int
j
=
0
;
j
<
taosArrayGetSize
(
pSub
->
consumers
);
j
++
)
{
SMqConsumerEp
*
pConsumerEp
=
taosArrayGet
(
pSub
->
consumers
,
j
);
if
(
pConsumerEp
->
consumerId
==
lostConsumerId
)
{
taosArrayPush
(
pSub
->
unassignedVg
,
pConsumerEp
);
SMqSubConsumer
*
pSubConsumer
=
taosArrayGet
(
pSub
->
consumers
,
j
);
if
(
pSubConsumer
->
consumerId
==
lostConsumerId
)
{
taosArrayAddAll
(
pSub
->
unassignedVg
,
pSubConsumer
->
vgInfo
);
taosArrayPush
(
pSub
->
lostConsumers
,
pSubConsumer
);
taosArrayRemove
(
pSub
->
consumers
,
j
);
break
;
}
...
...
@@ -389,8 +402,8 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
int32_t
vgEachConsumer
=
vgNum
/
consumerNum
;
int32_t
imbalanceVg
=
vgNum
%
consumerNum
;
int32_t
imbalanceSolved
=
0
;
SArray
*
unassignedVgStash
=
taosArrayInit
(
0
,
sizeof
(
SMqConsumerEp
));
SArray
*
unassignedConsumerIdx
=
taosArrayInit
(
0
,
sizeof
(
int32_t
));
/*SArray *unassignedVgStash = taosArrayInit(0, sizeof(SMqConsumerEp));*/
/*SArray *unassignedConsumerIdx = taosArrayInit(0, sizeof(int32_t));*/
// iterate all consumers, set unassignedVgStash
for
(
int
i
=
0
;
i
<
consumerNum
;
i
++
)
{
...
...
@@ -400,13 +413,13 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
if
(
i
<
imbalanceVg
)
vgThisConsumerAfterRb
=
vgEachConsumer
+
1
;
else
vgThisConsumerAfterRb
=
vgEachConsumer
;
mInfo
(
"mq consumer:%ld
,
connectted vgroup change from %d %d"
,
pSubConsumer
->
consumerId
,
vgThisConsumerBeforeRb
,
vgThisConsumerAfterRb
);
mInfo
(
"mq consumer:%ld
,
connectted vgroup change from %d %d"
,
pSubConsumer
->
consumerId
,
vgThisConsumerBeforeRb
,
vgThisConsumerAfterRb
);
while
(
taosArrayGetSize
(
pSubConsumer
->
vgInfo
)
>
vgThisConsumerAfterRb
)
{
SMqConsumerEp
*
pConsumerEp
=
taosArrayPop
(
pSubConsumer
->
vgInfo
);
ASSERT
(
pConsumerEp
!=
NULL
);
ASSERT
(
pConsumerEp
->
consumerId
==
pSubConsumer
->
consumerId
);
taosArrayPush
(
unassignedVgStash
,
pConsumerEp
);
taosArrayPush
(
pSub
->
unassignedVg
,
pConsumerEp
);
}
SMqConsumerObj
*
pRebConsumer
=
mndAcquireConsumer
(
pMnode
,
pSubConsumer
->
consumerId
);
...
...
@@ -422,7 +435,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
atomic_store_32
(
&
pRebConsumer
->
status
,
MQ_CONSUMER_STATUS__IDLE
);
}
mInfo
(
"mq consumer:%ld
, status change from %d %d"
,
pRebConsumer
->
consumerId
,
status
,
pRebConsumer
->
status
);
mInfo
(
"mq consumer:%ld, status change from %d %d"
,
pRebConsumer
->
consumerId
,
status
,
pRebConsumer
->
status
);
SSdbRaw
*
pConsumerRaw
=
mndConsumerActionEncode
(
pRebConsumer
);
sdbSetRawStatus
(
pConsumerRaw
,
SDB_STATUS_READY
);
...
...
@@ -432,7 +445,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
}
//assign to vgroup
if
(
taosArrayGetSize
(
unassignedVgStash
)
!=
0
)
{
if
(
taosArrayGetSize
(
pSub
->
unassignedVg
)
!=
0
)
{
for
(
int
i
=
0
;
i
<
consumerNum
;
i
++
)
{
SMqSubConsumer
*
pSubConsumer
=
taosArrayGet
(
pSub
->
consumers
,
i
);
int
vgThisConsumerBeforeRb
=
taosArrayGetSize
(
pSubConsumer
->
vgInfo
);
...
...
@@ -440,14 +453,13 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
if
(
i
<
imbalanceVg
)
vgThisConsumerAfterRb
=
vgEachConsumer
+
1
;
else
vgThisConsumerAfterRb
=
vgEachConsumer
;
while
(
taosArrayGetSize
(
pSubConsumer
->
vgInfo
)
<
vgThisConsumer
Before
Rb
)
{
SMqConsumerEp
*
pConsumerEp
=
taosArrayPop
(
unassignedVgStash
);
while
(
taosArrayGetSize
(
pSubConsumer
->
vgInfo
)
<
vgThisConsumer
After
Rb
)
{
SMqConsumerEp
*
pConsumerEp
=
taosArrayPop
(
pSub
->
unassignedVg
);
ASSERT
(
pConsumerEp
!=
NULL
);
ASSERT
(
pConsumerEp
->
consumerId
==
pSubConsumer
->
consumerId
);
pConsumerEp
->
oldConsumerId
=
pConsumerEp
->
consumerId
;
pConsumerEp
->
consumerId
=
pSubConsumer
->
consumerId
;
taosArrayPush
(
pSubConsumer
->
vgInfo
,
pConsumerEp
);
mInfo
(
"mq consumer:%ld , assign vgroup %d, previously assigned to consumer %ld"
,
pSubConsumer
->
consumerId
,
pConsumerEp
->
vgId
,
pConsumerEp
->
oldConsumerId
);
...
...
@@ -455,7 +467,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
}
}
}
ASSERT
(
taosArrayGetSize
(
unassignedVgStash
)
==
0
);
ASSERT
(
taosArrayGetSize
(
pSub
->
unassignedVg
)
==
0
);
// TODO: log rebalance statistics
SSdbRaw
*
pSubRaw
=
mndSubActionEncode
(
pSub
);
...
...
@@ -1019,7 +1031,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
pConsumerEp
->
consumerId
=
consumerId
;
taosArrayPush
(
mqSubConsumer
.
vgInfo
,
pConsumerEp
);
mndPersistMqSetConnReq
(
pMnode
,
pTrans
,
pTopic
,
cgroup
,
pConsumerEp
);
atomic_store_32
(
&
pConsumer
->
hbS
tatus
,
MQ_CONSUMER_STATUS__ACTIVE
);
atomic_store_32
(
&
pConsumer
->
s
tatus
,
MQ_CONSUMER_STATUS__ACTIVE
);
}
SSdbRaw
*
pRaw
=
mndSubActionEncode
(
pSub
);
...
...
source/dnode/mnode/impl/src/mnode.c
浏览文件 @
f0d7d1b3
...
...
@@ -389,7 +389,7 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
return
NULL
;
}
if
(
pRpcMsg
->
msgType
!=
TDMT_MND_TRANS
&&
pRpcMsg
->
msgType
!=
TDMT_MND_MQ_TIMER
)
{
if
(
pRpcMsg
->
msgType
!=
TDMT_MND_TRANS
&&
pRpcMsg
->
msgType
!=
TDMT_MND_MQ_TIMER
&&
pRpcMsg
->
msgType
!=
TDMT_MND_MQ_DO_REBALANCE
)
{
SRpcConnInfo
connInfo
=
{
0
};
if
((
pRpcMsg
->
msgType
&
1U
)
&&
rpcGetConnInfo
(
pRpcMsg
->
handle
,
&
connInfo
)
!=
0
)
{
taosFreeQitem
(
pMsg
);
...
...
source/dnode/vnode/inc/tq.h
浏览文件 @
f0d7d1b3
...
...
@@ -198,6 +198,7 @@ int tqCommit(STQ*);
int32_t
tqProcessConsumeReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
);
int32_t
tqProcessSetConnReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessRebReq
(
STQ
*
pTq
,
char
*
msg
);
#ifdef __cplusplus
}
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
f0d7d1b3
...
...
@@ -281,6 +281,19 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
return
0
;
}
int32_t
tqProcessRebReq
(
STQ
*
pTq
,
char
*
msg
)
{
SMqMVRebReq
req
=
{
0
};
tDecodeSMqMVRebReq
(
msg
,
&
req
);
STqConsumerHandle
*
pConsumer
=
tqHandleGet
(
pTq
->
tqMeta
,
req
.
oldConsumerId
);
ASSERT
(
pConsumer
);
tqHandleMovePut
(
pTq
->
tqMeta
,
req
.
newConsumerId
,
pConsumer
);
tqHandleCommit
(
pTq
->
tqMeta
,
req
.
newConsumerId
);
tqHandlePurge
(
pTq
->
tqMeta
,
req
.
oldConsumerId
);
terrno
=
TSDB_CODE_SUCCESS
;
return
0
;
}
int32_t
tqProcessSetConnReq
(
STQ
*
pTq
,
char
*
msg
)
{
SMqSetCVgReq
req
=
{
0
};
tDecodeSMqSetCVgReq
(
msg
,
&
req
);
...
...
source/dnode/vnode/src/vnd/vnodeWrite.c
浏览文件 @
f0d7d1b3
...
...
@@ -130,6 +130,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// TODO: handle error
}
}
break
;
case
TDMT_VND_MQ_REB
:
{
if
(
tqProcessRebReq
(
pVnode
->
pTq
,
POINTER_SHIFT
(
ptr
,
sizeof
(
SMsgHead
)))
<
0
)
{
}
}
break
;
default:
ASSERT
(
0
);
break
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录